diff --git a/S3ZeroCopyReplication.md b/S3ZeroCopyReplication.md index bfb39addcd2..5230640ebcc 100644 --- a/S3ZeroCopyReplication.md +++ b/S3ZeroCopyReplication.md @@ -37,7 +37,7 @@ В гибридном хранилище если парт переносится на S3, нода через ZK проверяет, нет был ли парт перенесен другой нодой, если был, то делает fetch (модифицированный по сравнению с обычным fetch'ем). -В конфиг добавлен флаг, по которому включается функционал нового протокола репликации - merge_tree->allow_s3_zero_copy_replication. Сейчас стоит в true - это времеменно, чтобы все тесты сейчас проходили с включенным флагом, перед финальным мержем надо не забыть заменить на false. +В конфиг добавлен флаг, по которому включается функционал нового протокола репликации - merge_tree->allow_s3_zero_copy_replication. Сейчас стоит в false. ## Костыли и недоработки, коих много diff --git a/src/Disks/DiskCacheWrapper.cpp b/src/Disks/DiskCacheWrapper.cpp index c26fa7623a4..df30af769e1 100644 --- a/src/Disks/DiskCacheWrapper.cpp +++ b/src/Disks/DiskCacheWrapper.cpp @@ -278,11 +278,11 @@ void DiskCacheWrapper::removeRecursive(const String & path) DiskDecorator::removeRecursive(path); } -void DiskCacheWrapper::removeShared(const String & path, bool keep_s3) +void DiskCacheWrapper::removeSharedFile(const String & path, bool keep_s3) { if (cache_disk->exists(path)) - cache_disk->removeShared(path, keep_s3); - DiskDecorator::removeShared(path, keep_s3); + cache_disk->removeSharedFile(path, keep_s3); + DiskDecorator::removeSharedFile(path, keep_s3); } void DiskCacheWrapper::removeSharedRecursive(const String & path, bool keep_s3) diff --git a/src/Disks/DiskCacheWrapper.h b/src/Disks/DiskCacheWrapper.h index fc7ccaaa345..8995bf1936d 100644 --- a/src/Disks/DiskCacheWrapper.h +++ b/src/Disks/DiskCacheWrapper.h @@ -41,7 +41,7 @@ public: void removeFileIfExists(const String & path) override; void removeDirectory(const String & path) override; void removeRecursive(const String & path) override; - void removeShared(const String & path, bool keep_s3) override; + void removeSharedFile(const String & path, bool keep_s3) override; void removeSharedRecursive(const String & path, bool keep_s3) override; void createHardLink(const String & src_path, const String & dst_path) override; ReservationPtr reserve(UInt64 bytes) override; diff --git a/src/Disks/DiskDecorator.cpp b/src/Disks/DiskDecorator.cpp index 9c8c7859b8b..96d2e8278e3 100644 --- a/src/Disks/DiskDecorator.cpp +++ b/src/Disks/DiskDecorator.cpp @@ -150,9 +150,9 @@ void DiskDecorator::removeRecursive(const String & path) delegate->removeRecursive(path); } -void DiskDecorator::removeShared(const String & path, bool keep_s3) +void DiskDecorator::removeSharedFile(const String & path, bool keep_s3) { - delegate->removeShared(path, keep_s3); + delegate->removeSharedFile(path, keep_s3); } void DiskDecorator::removeSharedRecursive(const String & path, bool keep_s3) diff --git a/src/Disks/DiskDecorator.h b/src/Disks/DiskDecorator.h index edba993639a..d069f8a84b6 100644 --- a/src/Disks/DiskDecorator.h +++ b/src/Disks/DiskDecorator.h @@ -43,7 +43,7 @@ public: void removeFileIfExists(const String & path) override; void removeDirectory(const String & path) override; void removeRecursive(const String & path) override; - void removeShared(const String & path, bool keep_s3) override; + void removeSharedFile(const String & path, bool keep_s3) override; void removeSharedRecursive(const String & path, bool keep_s3) override; void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; Poco::Timestamp getLastModified(const String & path) override; diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index 612c5ef88ee..a5a886c9c9f 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -201,10 +201,10 @@ public: /// Invoked when Global Context is shutdown. virtual void shutdown() { } - /// Return some uniq string for file, overrided for S3 + /// Return some uniq string for file, overrode for S3 virtual String getUniqueId(const String & path) const { return path; } - /// Check file, overrided for S3 only + /// Check file, overrode for S3 only virtual bool checkUniqueId(const String & id) const { return exists(id); } /// Returns executor to perform asynchronous operations. diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index bbedb2af8f6..aadfcfa82d6 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -935,8 +935,6 @@ bool DiskS3::checkUniqueId(const String & id) const throwIfError(resp); Aws::Vector object_list = resp.GetResult().GetContents(); - if (object_list.size() < 1) - return false; for (const auto & object : object_list) if (object.GetKey() == id) return true; diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index acfb75f681d..165f09ff1e4 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -118,7 +118,7 @@ public: String getUniqueId(const String & path) const override; - bool checkUniqueId(const String & path) const override; + bool checkUniqueId(const String & id) const override; private: bool tryReserve(UInt64 bytes); diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index f2ae78c85ce..7041cfd5ad2 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include @@ -619,11 +618,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3( DiskPtr disk = disks_s3[0]; - for (const auto & disk_ : disks_s3) + for (const auto & disk_s3 : disks_s3) { - if (disk_->checkUniqueId(part_id)) + if (disk_s3->checkUniqueId(part_id)) { - disk = disk_; + disk = disk_s3; break; } } @@ -662,7 +661,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3( String metadata_file = fullPath(disk, data_path); { - auto file_out = createWriteBufferFromFileBase(metadata_file, 0, 0, DBMS_DEFAULT_BUFFER_SIZE, -1); + auto file_out = std::make_unique(metadata_file, DBMS_DEFAULT_BUFFER_SIZE, -1, 0666, nullptr, 0); HashingWriteBuffer hashing_out(*file_out); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 89ec68e5068..69710311af3 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1132,7 +1132,9 @@ void IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & di if (disk->getType() == "s3") { - is_fetched = tryToFetchIfShared(disk, path_to_clone + "/" + name); + auto data_settings = storage.getSettings(); + if (data_settings->allow_s3_zero_copy_replication) + is_fetched = tryToFetchIfShared(disk, path_to_clone + "/" + name); } if (!is_fetched) @@ -1301,8 +1303,23 @@ void IMergeTreeDataPart::lockSharedData() const LOG_TRACE(storage.log, "Set zookeeper lock {}", zookeeper_node); - zk.zookeeper->createAncestors(zookeeper_node); - zk.zookeeper->createIfNotExists(zookeeper_node, "lock"); + /// In rare case other replica can remove path between createAncestors and createIfNotExists + /// So we make up to 5 attempts + for (int attempts = 5; attempts > 0; --attempts) + { + try + { + zk.zookeeper->createAncestors(zookeeper_node); + zk.zookeeper->createIfNotExists(zookeeper_node, "lock"); + break; + } + catch (const zkutil::KeeperException & e) + { + if (e.code == Coordination::Error::ZNONODE) + continue; + throw; + } + } } bool IMergeTreeDataPart::unlockSharedData() const @@ -1476,7 +1493,7 @@ bool IMergeTreeDataPart::tryToFetchIfShared(const DiskPtr & disk, const String & log_entry.disk = disk; log_entry.path = path; - /// TODO: !!! Fix const usage !!! + /// TODO: Fix const usage StorageReplicatedMergeTree *replicated_storage_nc = const_cast(replicated_storage); return replicated_storage_nc->executeFetchShared(log_entry); diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 68c69c3687e..ab6e2cc995e 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -108,7 +108,7 @@ struct Settings; M(UInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.", 0) \ M(String, storage_policy, "default", "Name of storage disk policy", 0) \ M(Bool, allow_nullable_key, false, "Allow Nullable types as primary keys.", 0) \ - M(Bool, allow_s3_zero_copy_replication, true, "Allow Zero-copy replication over S3", 0) \ + M(Bool, allow_s3_zero_copy_replication, false, "Allow Zero-copy replication over S3", 0) \ M(Bool, remove_empty_parts, true, "Remove empty parts after they were pruned by TTL, mutation, or collapsing merge algorithm", 0) \ M(Bool, assign_part_uuids, false, "Generate UUIDs for parts. Before enabling check that all replicas support new format.", 0) \ M(Int64, max_partitions_to_read, -1, "Limit the max number of partitions that can be accessed in one query. <= 0 means unlimited. This setting is the default that can be overridden by the query-level setting with the same name.", 0) \ diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c3b8731cbe8..2002c124a66 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1498,11 +1498,28 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) { auto zookeeper = getZooKeeper(); String zookeeper_node = zookeeper_path + "/zero_copy_s3/merged/" + entry.new_part_name; - zookeeper->createAncestors(zookeeper_node); - auto code = zookeeper->tryCreate(zookeeper_node, "lock", zkutil::CreateMode::Ephemeral); - /// Someone else created or started create this merge - if (code == Coordination::Error::ZNODEEXISTS) - return false; + + /// In rare case other replica can remove path between createAncestors and tryCreate + /// So we make up to 5 attempts to make a lock + for (int attempts = 5; attempts > 0; --attempts) + { + try + { + zookeeper->createAncestors(zookeeper_node); + auto code = zookeeper->tryCreate(zookeeper_node, "lock", zkutil::CreateMode::Ephemeral); + /// Someone else created or started create this merge + if (code == Coordination::Error::ZNODEEXISTS) + return false; + if (code != Coordination::Error::ZNONODE) + break; + } + catch (const zkutil::KeeperException & e) + { + if (e.code == Coordination::Error::ZNONODE) + continue; + throw; + } + } } } @@ -1930,7 +1947,7 @@ bool StorageReplicatedMergeTree::executeFetchShared(ReplicatedMergeTreeLogEntry try { - if (!fetchPart(entry.new_part_name, metadata_snapshot, zookeeper_path + "/replicas/" + entry.source_replica, false, entry.quorum, + if (!fetchPart(entry.new_part_name, metadata_snapshot, zookeeper_path + "/replicas/" + entry.source_replica, false, entry.quorum, nullptr, true, entry.disk, entry.path)) return false; } @@ -3624,7 +3641,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora { if (part->volume->getDisk()->getName() != replaced_disk->getName()) throw Exception("Part " + part->name + " fetched on wrong disk " + part->volume->getDisk()->getName(), ErrorCodes::LOGICAL_ERROR); - replaced_disk->removeIfExists(replaced_part_path); + replaced_disk->removeFileIfExists(replaced_part_path); replaced_disk->moveDirectory(part->getFullRelativePath(), replaced_part_path); } else