mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #41108 from ClickHouse/more_logging
More logging for S3
This commit is contained in:
commit
62c911924b
@ -282,7 +282,10 @@ String DiskObjectStorage::getUniqueId(const String & path) const
|
||||
bool DiskObjectStorage::checkUniqueId(const String & id) const
|
||||
{
|
||||
if (!id.starts_with(object_storage_root_path))
|
||||
{
|
||||
LOG_DEBUG(log, "Blob with id {} doesn't start with blob storage prefix {}", id, object_storage_root_path);
|
||||
return false;
|
||||
}
|
||||
|
||||
auto object = StoredObject::create(*object_storage, id, {}, {}, true);
|
||||
return object_storage->exists(object);
|
||||
|
@ -346,9 +346,10 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size());
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::S3_ERROR, "{} Tags:{}",
|
||||
outcome.GetError().GetMessage(),
|
||||
fmt::join(tags.begin(), tags.end(), " "));
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Tags: {}",
|
||||
outcome.GetError().GetMessage(), key, bucket, fmt::join(tags.begin(), tags.end(), " "));
|
||||
}
|
||||
}
|
||||
|
||||
@ -433,7 +434,10 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
|
||||
if (outcome.IsSuccess())
|
||||
LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
|
||||
else
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}",
|
||||
outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool);
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::waitForReadyBackGroundTasks()
|
||||
|
@ -399,7 +399,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
||||
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
const String & part_name,
|
||||
@ -420,6 +420,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
if (blocker.isCancelled())
|
||||
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
|
||||
|
||||
const auto data_settings = data.getSettings();
|
||||
|
||||
if (data_settings->allow_remote_fs_zero_copy_replication && !try_zero_copy)
|
||||
LOG_WARNING(log, "Zero copy replication enabled, but trying to fetch part {} without zero copy", part_name);
|
||||
|
||||
/// It should be "tmp-fetch_" and not "tmp_fetch_", because we can fetch part to detached/,
|
||||
/// but detached part name prefix should not contain underscore.
|
||||
static const String TMP_PREFIX = "tmp-fetch_";
|
||||
@ -429,7 +434,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
|
||||
/// Validation of the input that may come from malicious replica.
|
||||
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
||||
const auto data_settings = data.getSettings();
|
||||
|
||||
Poco::URI uri;
|
||||
uri.setScheme(interserver_scheme);
|
||||
@ -465,6 +469,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
capability.push_back(toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
}
|
||||
|
||||
if (!capability.empty())
|
||||
{
|
||||
::sort(capability.begin(), capability.end());
|
||||
@ -474,6 +479,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
}
|
||||
else
|
||||
{
|
||||
if (data_settings->allow_remote_fs_zero_copy_replication)
|
||||
LOG_WARNING(log, "Cannot select any zero-copy disk for {}", part_name);
|
||||
|
||||
try_zero_copy = false;
|
||||
}
|
||||
|
||||
@ -585,7 +593,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
temporary_directory_lock = {};
|
||||
|
||||
/// Try again but without zero-copy
|
||||
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
|
||||
return fetchSelectedPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
|
||||
user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk);
|
||||
}
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ public:
|
||||
explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
|
||||
|
||||
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
|
||||
MergeTreeData::MutableDataPartPtr fetchPart(
|
||||
MergeTreeData::MutableDataPartPtr fetchSelectedPart(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
const String & part_name,
|
||||
|
@ -2180,7 +2180,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
if (interserver_scheme != address.scheme)
|
||||
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
part_desc->res_part = fetcher.fetchPart(
|
||||
part_desc->res_part = fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path,
|
||||
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(),
|
||||
interserver_scheme, replicated_fetches_throttler, false, TMP_PREFIX + "fetch_");
|
||||
@ -2299,7 +2299,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
|
||||
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return fetcher.fetchPart(
|
||||
return fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), entry.new_part_name, source_replica_path,
|
||||
address.host, address.replication_port,
|
||||
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme,
|
||||
@ -3934,7 +3934,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
|
||||
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
|
||||
|
||||
return fetcher.fetchPart(
|
||||
return fetcher.fetchSelectedPart(
|
||||
metadata_snapshot,
|
||||
getContext(),
|
||||
part_name,
|
||||
@ -4071,7 +4071,7 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
currently_fetching_parts.erase(part_name);
|
||||
});
|
||||
|
||||
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
|
||||
LOG_DEBUG(log, "Fetching already known part {} from {}", part_name, source_replica_path);
|
||||
|
||||
TableLockHolder table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
|
||||
|
||||
@ -4101,7 +4101,7 @@ DataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
|
||||
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
|
||||
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
|
||||
|
||||
return fetcher.fetchPart(
|
||||
return fetcher.fetchSelectedPart(
|
||||
metadata_snapshot, getContext(), part_name, source_replica_path,
|
||||
address.host, address.replication_port,
|
||||
timeouts, credentials->getUser(), credentials->getPassword(),
|
||||
|
Loading…
Reference in New Issue
Block a user