diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 5ab0cc4612f..1958bb4101a 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -45,6 +45,7 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1; constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2; constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE = 3; constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4; +constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5; std::string getEndpointId(const std::string & node_id) @@ -109,7 +110,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo } /// We pretend to work as older server version, to be sure that client will correctly process our version - response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION))}); + response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID))}); ++total_sends; SCOPE_EXIT({--total_sends;}); @@ -142,8 +143,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo sendPartFromMemory(part, out); else { - bool send_default_compression_file = client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION; - sendPartFromDisk(part, out, send_default_compression_file); + sendPartFromDisk(part, out, client_protocol_version); } } catch (const NetException &) @@ -176,7 +176,7 @@ void Service::sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteB block_out.write(part_in_memory->block); } -void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_default_compression_file) +void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, int client_protocol_version) { /// We'll take a list of files from the list of checksums. MergeTreeData::DataPart::Checksums checksums = part->checksums; @@ -184,8 +184,12 @@ void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuf auto file_names_without_checksums = part->getFileNamesWithoutChecksums(); for (const auto & file_name : file_names_without_checksums) { - if (!send_default_compression_file && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME) + if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME) continue; + + if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID && file_name == IMergeTreeDataPart::UUID_FILE_NAME) + continue; + checksums.files[file_name] = {}; } @@ -263,7 +267,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( { {"endpoint", getEndpointId(replica_path)}, {"part", part_name}, - {"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION)}, + {"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)}, {"compress", "false"} }); @@ -430,7 +434,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( if (file_name != "checksums.txt" && file_name != "columns.txt" && - file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME) + file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME && + file_name != IMergeTreeDataPart::UUID_FILE_NAME) checksums.addFile(file_name, file_size, expected_hash); if (sync) diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index 52a34a2239a..063d3e91d1a 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -32,7 +32,7 @@ public: private: MergeTreeData::DataPartPtr findPart(const String & name); void sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out); - void sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_default_compression_file); + void sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, int client_protocol_version); private: /// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish, diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index ffc2dd62ce0..38d2e712ec3 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -410,6 +410,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks /// Motivation: memory for index is shared between queries - not belong to the query itself. MemoryTracker::BlockerInThread temporarily_disable_memory_tracker; + loadUUID(); loadColumns(require_columns_checksums); loadChecksums(require_columns_checksums); loadIndexGranularity(); @@ -482,9 +483,14 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const NameSet result = {"checksums.txt", "columns.txt"}; String default_codec_path = getFullRelativePath() + DEFAULT_COMPRESSION_CODEC_FILE_NAME; + if (volume->getDisk()->exists(default_codec_path)) result.emplace(DEFAULT_COMPRESSION_CODEC_FILE_NAME); + String uuid_path = getFullRelativePath() + UUID_FILE_NAME; + if (volume->getDisk()->exists(uuid_path)) + result.emplace(UUID_FILE_NAME); + return result; } @@ -726,6 +732,19 @@ void IMergeTreeDataPart::loadTTLInfos() } } +void IMergeTreeDataPart::loadUUID() +{ + String path = getFullRelativePath() + UUID_FILE_NAME; + + if (volume->getDisk()->exists(path)) + { + auto in = openForReading(volume->getDisk(), path); + readText(uuid, *in); + if (uuid == UUIDHelpers::Nil) + throw Exception("Unexpected empty " + String(UUID_FILE_NAME) + " in part: " + name, ErrorCodes::LOGICAL_ERROR); + } +} + void IMergeTreeDataPart::loadColumns(bool require) { String path = getFullRelativePath() + "columns.txt"; @@ -894,6 +913,7 @@ void IMergeTreeDataPart::remove() const for (const auto & file : {"checksums.txt", "columns.txt"}) volume->getDisk()->remove(to + "/" + file); + volume->getDisk()->removeIfExists(to + "/" + UUID_FILE_NAME); volume->getDisk()->removeIfExists(to + "/" + DEFAULT_COMPRESSION_CODEC_FILE_NAME); volume->getDisk()->removeIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 21932ba445c..284ec667d0a 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -164,6 +164,11 @@ public: String name; MergeTreePartInfo info; + /// Part unique identifier. + /// The intention is to use it for identifying cases where the same part is + /// processed by multiple shards. + UUID uuid; + VolumePtr volume; /// A directory path (relative to storage's path) where part data is actually stored @@ -348,6 +353,8 @@ public: static inline constexpr auto DELETE_ON_DESTROY_MARKER_FILE_NAME = "delete-on-destroy.txt"; + static inline constexpr auto UUID_FILE_NAME = "uuid.txt"; + /// Checks that all TTLs (table min/max, column ttls, so on) for part /// calculated. Part without calculated TTL may exist if TTL was added after /// part creation (using alter query with materialize_ttl setting). @@ -384,6 +391,9 @@ private: /// In compact parts order of columns is necessary NameToPosition column_name_to_position; + /// Reads part unique identifier (if exists) from uuid.txt + void loadUUID(); + /// Reads columns names and types from columns.txt void loadColumns(bool require); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 055a5ec7b7a..e889056cd75 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -1138,6 +1138,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor future_part.name, future_part.type, future_part.part_info, single_disk_volume, "tmp_mut_" + future_part.name); new_data_part->is_temp = true; + new_data_part->uuid = source_part->uuid; new_data_part->ttl_infos = source_part->ttl_infos; /// It shouldn't be changed by mutation. @@ -1818,6 +1819,13 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart( const CompressionCodecPtr & codec) { auto disk = new_data_part->volume->getDisk(); + + if (new_data_part->uuid != UUIDHelpers::Nil) + { + auto out = disk->writeFile(new_data_part->getFullRelativePath() + IMergeTreeDataPart::UUID_FILE_NAME, 4096); + writeUUIDText(new_data_part->uuid, *out); + } + if (need_remove_expired_values) { /// Write a file with ttl infos in json format. diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 1b40f9ab292..83c59780a0b 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -133,6 +133,15 @@ void MergedBlockOutputStream::finalizePartOnDisk( MergeTreeData::DataPart::Checksums & checksums, bool sync) { + if (new_part->uuid != UUIDHelpers::Nil) + { + auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::UUID_FILE_NAME, 4096); + writeUUIDText(new_part->uuid, *out); + out->finalize(); + if (sync) + out->sync(); + } + if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part)) { new_part->partition.store(storage, volume->getDisk(), part_path, checksums); diff --git a/src/Storages/System/StorageSystemParts.cpp b/src/Storages/System/StorageSystemParts.cpp index 757aea661e2..7ae20ed024e 100644 --- a/src/Storages/System/StorageSystemParts.cpp +++ b/src/Storages/System/StorageSystemParts.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -20,6 +21,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_) { {"partition", std::make_shared()}, {"name", std::make_shared()}, + {"uuid", std::make_shared()}, {"part_type", std::make_shared()}, {"active", std::make_shared()}, {"marks", std::make_shared()}, @@ -93,6 +95,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto columns_[i++]->insert(out.str()); } columns_[i++]->insert(part->name); + columns_[i++]->insert(part->uuid); columns_[i++]->insert(part->getTypeName()); columns_[i++]->insert(part_state == State::Committed); columns_[i++]->insert(part->getMarksCount()); diff --git a/tests/integration/test_part_uuid/__init__.py b/tests/integration/test_part_uuid/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_part_uuid/configs/remote_servers.xml b/tests/integration/test_part_uuid/configs/remote_servers.xml new file mode 100644 index 00000000000..998a969e87e --- /dev/null +++ b/tests/integration/test_part_uuid/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + \ No newline at end of file diff --git a/tests/integration/test_part_uuid/test.py b/tests/integration/test_part_uuid/test.py new file mode 100644 index 00000000000..a00575c2504 --- /dev/null +++ b/tests/integration/test_part_uuid/test.py @@ -0,0 +1,53 @@ +import time +import uuid + +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + 'node1', + main_configs=['configs/remote_servers.xml'], + with_zookeeper=True) + +node2 = cluster.add_instance( + 'node2', + main_configs=['configs/remote_servers.xml'], + with_zookeeper=True) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_part_uuid(started_cluster): + test_uuid = str(uuid.uuid4()) + + for ix, n in enumerate([node1, node2]): + n.query(""" + CREATE TABLE t(key UInt64, value UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') + ORDER BY tuple() + """.format(ix)) + + node1.query("INSERT INTO t VALUES (1, 1)") + + node1.query("ALTER TABLE t DETACH PARTITION tuple()") + node1.exec_in_container([ + "bash", "-c", + "echo '{}' > /var/lib/clickhouse/data/default/t/detached/{}/uuid.txt".format(test_uuid, "all_0_0_0") + ]) + node1.query("ALTER TABLE t ATTACH PARTITION tuple()") + node1.query("ALTER TABLE t UPDATE value = 1 WHERE key = 1") + + assert test_uuid == node1.query("SELECT uuid FROM system.parts WHERE table = 't' AND active ORDER BY name").strip() + + node2.query("SYSTEM SYNC REPLICA t") + assert test_uuid == node2.query("SELECT uuid FROM system.parts WHERE table = 't' AND active ORDER BY name").strip()