diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 913fd76bf8a..db8f90e777d 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -282,7 +282,10 @@ String DiskObjectStorage::getUniqueId(const String & path) const bool DiskObjectStorage::checkUniqueId(const String & id) const { if (!id.starts_with(object_storage_root_path)) + { + LOG_DEBUG(log, "Blob with id {} doesn't start with blob storage prefix {}", id, object_storage_root_path); return false; + } auto object = StoredObject::create(*object_storage, id, {}, {}, true); return object_storage->exists(object); diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index cc46af361cd..b09abda85db 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -346,9 +346,10 @@ void WriteBufferFromS3::completeMultipartUpload() LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size()); else { - throw Exception(ErrorCodes::S3_ERROR, "{} Tags:{}", - outcome.GetError().GetMessage(), - fmt::join(tags.begin(), tags.end(), " ")); + throw S3Exception( + outcome.GetError().GetErrorType(), + "Message: {}, Key: {}, Bucket: {}, Tags: {}", + outcome.GetError().GetMessage(), key, bucket, fmt::join(tags.begin(), tags.end(), " ")); } } @@ -433,7 +434,10 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task) if (outcome.IsSuccess()) LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool); else - throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType()); + throw S3Exception( + outcome.GetError().GetErrorType(), + "Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}", + outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool); } void WriteBufferFromS3::waitForReadyBackGroundTasks() diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 9f8313a4700..ecb692aaf9b 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -399,7 +399,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name) throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name); } -MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( +MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const String & part_name, @@ -420,6 +420,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( if (blocker.isCancelled()) throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED); + const auto data_settings = data.getSettings(); + + if (data_settings->allow_remote_fs_zero_copy_replication && !try_zero_copy) + LOG_WARNING(log, "Zero copy replication enabled, but trying to fetch part {} without zero copy", part_name); + /// It should be "tmp-fetch_" and not "tmp_fetch_", because we can fetch part to detached/, /// but detached part name prefix should not contain underscore. static const String TMP_PREFIX = "tmp-fetch_"; @@ -429,7 +434,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( /// Validation of the input that may come from malicious replica. auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version); - const auto data_settings = data.getSettings(); Poco::URI uri; uri.setScheme(interserver_scheme); @@ -465,6 +469,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( capability.push_back(toString(disk->getDataSourceDescription().type)); } } + if (!capability.empty()) { ::sort(capability.begin(), capability.end()); @@ -474,6 +479,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( } else { + if (data_settings->allow_remote_fs_zero_copy_replication) + LOG_WARNING(log, "Cannot select any zero-copy disk for {}", part_name); + try_zero_copy = false; } @@ -585,7 +593,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( temporary_directory_lock = {}; /// Try again but without zero-copy - return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, + return fetchSelectedPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk); } } diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index 0e19bf4cdcd..e2582c42dfb 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -66,7 +66,7 @@ public: explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {} /// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory. - MergeTreeData::MutableDataPartPtr fetchPart( + MergeTreeData::MutableDataPartPtr fetchSelectedPart( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const String & part_name, diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 0b05428ceaf..3491f6c9d4a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2180,7 +2180,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) if (interserver_scheme != address.scheme) throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); - part_desc->res_part = fetcher.fetchPart( + part_desc->res_part = fetcher.fetchSelectedPart( metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_"); @@ -2299,7 +2299,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); - return fetcher.fetchPart( + return fetcher.fetchSelectedPart( metadata_snapshot, getContext(), entry.new_part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, @@ -3934,7 +3934,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH); - return fetcher.fetchPart( + return fetcher.fetchSelectedPart( metadata_snapshot, getContext(), part_name, @@ -4071,7 +4071,7 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart( currently_fetching_parts.erase(part_name); }); - LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path); + LOG_DEBUG(log, "Fetching already known part {} from {}", part_name, source_replica_path); TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); @@ -4101,7 +4101,7 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart( + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH); - return fetcher.fetchPart( + return fetcher.fetchSelectedPart( metadata_snapshot, getContext(), part_name, source_replica_path, address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),