diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9449cd571a1..32d6b50f647 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -158,6 +158,7 @@ class IColumn; \ M(UInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \ M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \ + M(Bool, insert_quorum_parallel, false, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \ M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \ M(UInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.", 0) \ M(Milliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.", 0) \ diff --git a/src/Storages/MergeTree/MergeTreePartInfo.h b/src/Storages/MergeTree/MergeTreePartInfo.h index 416308861b7..d538418c29b 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.h +++ b/src/Storages/MergeTree/MergeTreePartInfo.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -18,6 +19,8 @@ struct MergeTreePartInfo Int64 max_block = 0; UInt32 level = 0; Int64 mutation = 0; /// If the part has been mutated or contains mutated parts, is equal to mutation version number. + std::optional block_id; /// hex if write with quorum and min_block == max_block +/// shouldn't be here... MergeTreePartInfo() = default; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeBlockEntry.h new file mode 100644 index 00000000000..2677323ae89 --- /dev/null +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockEntry.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** To implement the functionality of the "quorum write". + * Information about which replicas the inserted part of data appeared on, + * and on how many replicas it should be. + */ +struct ReplicatedMergeTreeBlockEntry +{ + String part_name; + std::optional quorum_status; + + ReplicatedMergeTreeBlockEntry() {} + ReplicatedMergeTreeBlockEntry(const String & str) + { + fromString(str); + } + + void writeText(WriteBuffer & out) const + { + out << part_name << "\n"; + + if (quorum_status) + quorum_status->writeText(out); + } + + void readText(ReadBuffer & in) + { + in >> part_name; + + if (!in.eof()) + { + in >> "\n"; + quorum_status = ReplicatedMergeTreeQuorumStatusEntry(); + quorum_status->readText(in); + } + } + + String toString() const + { + WriteBufferFromOwnString out; + writeText(out); + return out.str(); + } + + void fromString(const String & str) + { + ReadBufferFromString in(str); + readText(in); + } +}; + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 5696a9cf890..f56176f4ff6 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -39,12 +40,14 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, + bool quorum_parallel_, bool deduplicate_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , quorum(quorum_) , quorum_timeout_ms(quorum_timeout_ms_) , max_parts_per_block(max_parts_per_block_) + , quorum_parallel(quorum_parallel_) , deduplicate(deduplicate_) , log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)")) { @@ -243,15 +246,21 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( Int64 block_number = 0; String existing_part_name; + ReplicatedMergeTreeBlockEntry block_entry; if (block_number_lock) { is_already_existing_part = false; block_number = block_number_lock->getNumber(); + block_entry.part_name = part->name; /// Set part attributes according to part_number. Prepare an entry for log. part->info.min_block = block_number; part->info.max_block = block_number; + + /// ALEXELEXA + /// somehow need to send this block_if to part node. TODO + part->info.block_id = block_id; part->info.level = 0; part->name = part->getNewName(part->info); @@ -282,10 +291,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( */ if (quorum) { - ReplicatedMergeTreeQuorumEntry quorum_entry; - quorum_entry.part_name = part->name; - quorum_entry.required_number_of_replicas = quorum; - quorum_entry.replicas.insert(storage.replica_name); + ReplicatedMergeTreeQuorumStatusEntry status_entry; + status_entry.required_number_of_replicas = quorum; + status_entry.replicas.insert(storage.replica_name); /** At this point, this node will contain information that the current replica received a part. * When other replicas will receive this part (in the usual way, processing the replication log), @@ -294,11 +302,21 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( * which indicates that the quorum has been reached. */ - ops.emplace_back( - zkutil::makeCreateRequest( - quorum_info.status_path, - quorum_entry.toString(), - zkutil::CreateMode::Persistent)); + if (!quorum_parallel) + { + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.part_name = part->name; + quorum_entry.status = status_entry; + + ops.emplace_back( + zkutil::makeCreateRequest( + quorum_info.status_path, + quorum_entry.toString(), + zkutil::CreateMode::Persistent)); + + } + else + block_entry.quorum_status = status_entry; /// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished). ops.emplace_back( @@ -352,7 +370,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( } /// Information about the part. - storage.getCommitPartOps(ops, part, block_id_path); + storage.getCommitPartOps(ops, part, block_id_path, block_entry); MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set. bool renamed = false; @@ -466,13 +484,15 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( if (is_already_existing_part) { /// We get duplicate part without fetch - storage.updateQuorum(part->name); + /// ALEXELEXA + /// should reset here something, after thinking in TODO + storage.updateQuorum(part->name, part->info.block_id); } /// We are waiting for quorum to be satisfied. LOG_TRACE(log, "Waiting for quorum"); - String quorum_status_path = storage.zookeeper_path + "/quorum/status"; + String quorum_status_path = quorum_parallel ? storage.zookeeper_path + "/blocks/" + block_id : storage.zookeeper_path + "/quorum/status"; try { @@ -481,15 +501,25 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( zkutil::EventPtr event = std::make_shared(); std::string value; + ReplicatedMergeTreeQuorumEntry quorum_entry; + ReplicatedMergeTreeBlockEntry block_entry; /// `get` instead of `exists` so that `watch` does not leak if the node is no longer there. if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event)) break; - ReplicatedMergeTreeQuorumEntry quorum_entry(value); - /// If the node has time to disappear, and then appear again for the next insert. - if (quorum_entry.part_name != part->name) - break; + if (quorum_parallel) + { + block_entry.fromString(value); + if (block_entry.part_name != part->name) + break; + } + else + { + quorum_entry.fromString(value); + if (quorum_entry.part_name != part->name) + break; + } if (!event->tryWait(quorum_timeout_ms)) throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index fa3ede20c28..97c094c1128 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -28,6 +28,7 @@ public: size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, + bool quorum_parallel_, bool deduplicate_); Block getHeader() const override; @@ -64,6 +65,7 @@ private: size_t quorum_timeout_ms; size_t max_parts_per_block; + bool quorum_parallel = false; bool deduplicate = true; bool last_block_is_duplicate = false; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 11f23a5c110..4df47915048 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -70,6 +70,8 @@ void ReplicatedMergeTreeCleanupThread::iterate() if (storage.is_leader) { clearOldLogs(); + /// ALEXELEXA + /// may be just remove it? clearOldBlocks(); clearOldMutations(); } @@ -344,6 +346,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) { String path = storage.zookeeper_path + "/blocks/" + it->node; + /// ALEXELEXA + /// should check somehow is it quorum block try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path)); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.cpp index 88f4a3ec66f..db52f05c0c9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.cpp @@ -34,11 +34,24 @@ ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromColumnsAndCheck return ReplicatedMergeTreePartHeader(getSipHash(columns.toString()), std::move(checksums)); } +ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromColumnsChecksumsBlockID( + const NamesAndTypesList & columns, + const MergeTreeDataPartChecksums & full_checksums, + const String & block_id_) +{ + MinimalisticDataPartChecksums checksums; + checksums.computeTotalChecksums(full_checksums); + return ReplicatedMergeTreePartHeader(getSipHash(columns.toString()), std::move(checksums), block_id_); +} + void ReplicatedMergeTreePartHeader::read(ReadBuffer & in) { in >> "part header format version: 1\n"; in.readStrict(columns_hash.data(), columns_hash.size()); checksums.deserializeWithoutHeader(in); + + if (!in.eof()) + in >> "block_id: " >> block_id.value() >> "\n"; } ReplicatedMergeTreePartHeader ReplicatedMergeTreePartHeader::fromString(const String & str) @@ -54,6 +67,8 @@ void ReplicatedMergeTreePartHeader::write(WriteBuffer & out) const writeString("part header format version: 1\n", out); out.write(columns_hash.data(), columns_hash.size()); checksums.serializeWithoutHeader(out); + if (block_id) + out << "block_id " << block_id.value() << "\n"; } String ReplicatedMergeTreePartHeader::toString() const diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.h b/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.h index 7bc2b72d2d5..d0b8c11dfb6 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartHeader.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -28,6 +29,9 @@ public: static ReplicatedMergeTreePartHeader fromColumnsAndChecksums( const NamesAndTypesList & columns, const MergeTreeDataPartChecksums & full_checksums); + static ReplicatedMergeTreePartHeader fromColumnsChecksumsBlockID( + const NamesAndTypesList & columns, const MergeTreeDataPartChecksums & full_checksums, const String & block_id_); + void read(ReadBuffer & in); static ReplicatedMergeTreePartHeader fromString(const String & str); @@ -38,13 +42,15 @@ public: const MinimalisticDataPartChecksums & getChecksums() const { return checksums; } private: - ReplicatedMergeTreePartHeader(std::array columns_hash_, MinimalisticDataPartChecksums checksums_) - : columns_hash(std::move(columns_hash_)), checksums(std::move(checksums_)) + ReplicatedMergeTreePartHeader(std::array columns_hash_, MinimalisticDataPartChecksums checksums_, + std::optional block_id_ = std::nullopt) + : columns_hash(std::move(columns_hash_)), checksums(std::move(checksums_)), block_id(std::move(block_id_)) { } std::array columns_hash; MinimalisticDataPartChecksums checksums; + std::optional block_id; }; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h index ee12cabb5aa..93e117d82af 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB @@ -20,8 +21,7 @@ namespace DB struct ReplicatedMergeTreeQuorumEntry { String part_name; - size_t required_number_of_replicas{}; - std::set replicas; + ReplicatedMergeTreeQuorumStatusEntry status; ReplicatedMergeTreeQuorumEntry() {} ReplicatedMergeTreeQuorumEntry(const String & str) @@ -32,31 +32,15 @@ struct ReplicatedMergeTreeQuorumEntry void writeText(WriteBuffer & out) const { out << "version: 1\n" - << "part_name: " << part_name << "\n" - << "required_number_of_replicas: " << required_number_of_replicas << "\n" - << "actual_number_of_replicas: " << replicas.size() << "\n" - << "replicas:\n"; - - for (const auto & replica : replicas) - out << escape << replica << "\n"; + << "part_name: " << part_name << "\n"; + status.writeText(out); } void readText(ReadBuffer & in) { - size_t actual_number_of_replicas = 0; - in >> "version: 1\n" - >> "part_name: " >> part_name >> "\n" - >> "required_number_of_replicas: " >> required_number_of_replicas >> "\n" - >> "actual_number_of_replicas: " >> actual_number_of_replicas >> "\n" - >> "replicas:\n"; - - for (size_t i = 0; i < actual_number_of_replicas; ++i) - { - String replica; - in >> escape >> replica >> "\n"; - replicas.insert(replica); - } + >> "part_name: " >> part_name >> "\n"; + status.readText(in); } String toString() const diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQuorumStatusEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeQuorumStatusEntry.h new file mode 100644 index 00000000000..bb3cb83b9a1 --- /dev/null +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQuorumStatusEntry.h @@ -0,0 +1,76 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/** To implement the functionality of the "quorum write". + * Information about which replicas the inserted part of data appeared on, + * and on how many replicas it should be. + */ +struct ReplicatedMergeTreeQuorumStatusEntry +{ + size_t required_number_of_replicas{}; + std::set replicas; + + ReplicatedMergeTreeQuorumStatusEntry() {} + ReplicatedMergeTreeQuorumStatusEntry(const String & str) + { + fromString(str); + } + + void writeText(WriteBuffer & out) const + { + out << "required_number_of_replicas: " << required_number_of_replicas << "\n" + << "actual_number_of_replicas: " << replicas.size() << "\n" + << "replicas:\n"; + + for (const auto & replica : replicas) + out << escape << replica << "\n"; + } + + void readText(ReadBuffer & in) + { + size_t actual_number_of_replicas = 0; + + in >> "required_number_of_replicas: " >> required_number_of_replicas >> "\n" + >> "actual_number_of_replicas: " >> actual_number_of_replicas >> "\n" + >> "replicas:\n"; + + for (size_t i = 0; i < actual_number_of_replicas; ++i) + { + String replica; + in >> escape >> replica >> "\n"; + replicas.insert(replica); + } + } + + String toString() const + { + WriteBufferFromOwnString out; + writeText(out); + return out.str(); + } + + void fromString(const String & str) + { + ReadBufferFromString in(str); + readText(in); + } + + bool isQuorumReached() + { + return required_number_of_replicas <= replicas.size(); + } +}; + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 8424da11896..5a89906dd3f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -227,9 +227,9 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart() if (zookeeper->tryGet(storage.zookeeper_path + "/quorum/status", quorum_str)) { ReplicatedMergeTreeQuorumEntry quorum_entry; - quorum_entry.fromString(quorum_str); + quorum_entry.status.fromString(quorum_str); - if (!quorum_entry.replicas.count(storage.replica_name) + if (!quorum_entry.status.replicas.count(storage.replica_name) && zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name)) { LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", quorum_entry.part_name); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 65c0c5ac313..1df419b6dac 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1176,6 +1176,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil: { ops.emplace_back(zkutil::makeCreateRequest( part_path, local_part_header.toString(), zkutil::CreateMode::Persistent)); + LOG_DEBUG(log, "local_part_header.toString(): {}", local_part_header.toString()); } else { @@ -3028,7 +3029,7 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart( /** If a quorum is tracked for a part, update information about it in ZK. */ -void StorageReplicatedMergeTree::updateQuorum(const String & part_name) +void StorageReplicatedMergeTree::updateQuorum(const String & part_name, const std::optional & block_id) { auto zookeeper = getZooKeeper(); @@ -3036,25 +3037,44 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name) const String quorum_status_path = zookeeper_path + "/quorum/status"; /// The name of the previous part for which the quorum was reached. const String quorum_last_part_path = zookeeper_path + "/quorum/last_part"; - - String value; - Coordination::Stat stat; + const String quorum_parallel_status_path = block_id ? zookeeper_path + "/blocks/" + *block_id : ""; /// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed. - while (zookeeper->tryGet(quorum_status_path, value, &stat)) + String value; + Coordination::Stat stat; + while (true) { + bool is_parallel = false; + ReplicatedMergeTreeBlockEntry block_entry; ReplicatedMergeTreeQuorumEntry quorum_entry; - quorum_entry.fromString(value); - if (quorum_entry.part_name != part_name) + if (zookeeper->tryGet(quorum_status_path, value, &stat)) + { + quorum_entry.fromString(value); + quorum_entry.status.replicas.insert(replica_name); + } + else if (block_id && zookeeper->tryGet(quorum_parallel_status_path, value, &stat)) + { + block_entry.fromString(value); + // The quorum has already been achieved + if (!block_entry.quorum_status) + break; + + is_parallel = true; + block_entry.quorum_status->replicas.insert(replica_name); + if (block_entry.quorum_status->isQuorumReached()) + block_entry.quorum_status.reset(); + } + else + break; + + if (quorum_entry.part_name != part_name && block_entry.part_name != part_name) { /// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started. break; } - quorum_entry.replicas.insert(replica_name); - - if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas) + if (quorum_entry.status.isQuorumReached()) { /// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum. @@ -3098,8 +3118,16 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name) } else { - /// We update the node, registering there one more replica. - auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version); + Coordination::Error code; + if (is_parallel) + { + /// We update the node, registering there one more replica. + code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version); + } + else { + /// Update parallel quorum. It also might be reached here. + code = zookeeper->trySet(quorum_parallel_status_path, block_entry.toString(), stat.version); + } if (code == Coordination::Error::ZOK) { @@ -3588,6 +3616,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, *this, metadata_snapshot, query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, + query_settings.insert_quorum_parallel, deduplicate); } @@ -4164,7 +4193,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition( PartsTemporaryRename renamed_parts(*this, "detached/"); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); - ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false); /// TODO Allow to use quorum here. + ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false); /// TODO Allow to use quorum here. for (size_t i = 0; i < loaded_parts.size(); ++i) { String old_name = loaded_parts[i]->name; @@ -5242,6 +5271,8 @@ void StorageReplicatedMergeTree::clearBlocksInPartition( if (result.error == Coordination::Error::ZNONODE) continue; + /// ALEXELEXA + /// should parse it here using another way ReadBufferFromString buf(result.data); Int64 block_num = 0; bool parsed = tryReadIntText(block_num, buf) && buf.eof(); @@ -5661,18 +5692,19 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta void StorageReplicatedMergeTree::getCommitPartOps( Coordination::Requests & ops, MutableDataPartPtr & part, - const String & block_id_path) const + const String & block_id_path, + ReplicatedMergeTreeBlockEntry block_entry) const { - const String & part_name = part->name; const auto storage_settings_ptr = getSettings(); if (!block_id_path.empty()) { /// Make final duplicate check and commit block_id + block_entry.part_name = part->name; ops.emplace_back( zkutil::makeCreateRequest( block_id_path, - part_name, /// We will be able to know original part number for duplicate blocks, if we want. + block_entry.toString(), /// We will be able to know original part number for duplicate blocks, if we want. zkutil::CreateMode::Persistent)); } @@ -5683,6 +5715,8 @@ void StorageReplicatedMergeTree::getCommitPartOps( replica_path + "/parts/" + part->name, ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(), zkutil::CreateMode::Persistent)); + LOG_DEBUG(log, "ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString(): {}", + ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(part->getColumns(), part->checksums).toString()); } else { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2bc9265331d..daf8b712a6c 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -350,7 +351,8 @@ private: bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; - void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const; + void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "", + ReplicatedMergeTreeBlockEntry block_entry = ReplicatedMergeTreeBlockEntry()) const; /// Adds actions to `ops` that remove a part from ZooKeeper. /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes). @@ -492,7 +494,7 @@ private: /// With the quorum being tracked, add a replica to the quorum for the part. - void updateQuorum(const String & part_name); + void updateQuorum(const String & part_name, const std::optional & block_id = std::nullopt); /// Deletes info from quorum/last_part node for particular partition_id. void cleanLastPartNode(const String & partition_id);