Merge pull request #41016 from ClickHouse/one_more_logging

Slightly improve diagnostics and remove assertions
This commit is contained in:
alesapin 2022-09-07 15:23:17 +02:00 committed by GitHub
commit 365438d617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 136 additions and 43 deletions

View File

@ -56,7 +56,7 @@ void throwIfError(const Aws::Utils::Outcome<Result, Error> & response)
if (!response.IsSuccess()) if (!response.IsSuccess())
{ {
const auto & err = response.GetError(); const auto & err = response.GetError();
throw Exception(ErrorCodes::S3_ERROR, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType())); throw S3Exception(fmt::format("{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType())), err.GetErrorType());
} }
} }
@ -70,7 +70,7 @@ void throwIfUnexpectedError(const Aws::Utils::Outcome<Result, Error> & response,
if (!response.IsSuccess() && (!if_exists || !isNotFoundError(response.GetError().GetErrorType()))) if (!response.IsSuccess() && (!if_exists || !isNotFoundError(response.GetError().GetErrorType())))
{ {
const auto & err = response.GetError(); const auto & err = response.GetError();
throw Exception(ErrorCodes::S3_ERROR, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType())); throw S3Exception(err.GetErrorType(), "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
} }
} }

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE; extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND; extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int CANNOT_ALLOCATE_MEMORY;
} }
@ -136,6 +137,23 @@ bool ReadBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Microseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1); ProfileEvents::increment(ProfileEvents::ReadBufferFromS3RequestsErrors, 1);
if (const auto * s3_exception = dynamic_cast<const S3Exception *>(&e))
{
/// It doesn't make sense to retry Access Denied or No Such Key
if (!s3_exception->isRetryableError())
{
tryLogCurrentException(log);
throw;
}
}
/// It doesn't make sense to retry allocator errors
if (e.code() == ErrorCodes::CANNOT_ALLOCATE_MEMORY)
{
tryLogCurrentException(log);
throw;
}
LOG_DEBUG( LOG_DEBUG(
log, log,
"Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, Attempt: {}, Message: {}", "Caught exception while reading S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}, Attempt: {}, Message: {}",
@ -306,7 +324,10 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size); return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
} }
else else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); {
const auto & error = outcome.GetError();
throw S3Exception(error.GetMessage(), error.GetErrorType());
}
} }
SeekableReadBufferPtr ReadBufferS3Factory::getReader() SeekableReadBufferPtr ReadBufferS3Factory::getReader()

View File

@ -35,6 +35,26 @@
# include <fstream> # include <fstream>
namespace DB
{
bool S3Exception::isRetryableError() const
{
/// Looks like these list is quite conservative, add more codes if you wish
static const std::unordered_set<Aws::S3::S3Errors> unretryable_errors = {
Aws::S3::S3Errors::NO_SUCH_KEY,
Aws::S3::S3Errors::ACCESS_DENIED,
Aws::S3::S3Errors::INVALID_ACCESS_KEY_ID,
Aws::S3::S3Errors::INVALID_SIGNATURE,
Aws::S3::S3Errors::NO_SUCH_UPLOAD,
Aws::S3::S3Errors::NO_SUCH_BUCKET,
};
return !unretryable_errors.contains(code);
}
}
namespace namespace
{ {

View File

@ -7,23 +7,62 @@
#include <base/types.h> #include <base/types.h>
#include <aws/core/Aws.h> #include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h> #include <aws/core/client/ClientConfiguration.h>
#include <aws/s3/S3Errors.h>
#include <IO/S3/PocoHTTPClient.h> #include <IO/S3/PocoHTTPClient.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#include <Common/Exception.h>
namespace Aws::S3 namespace Aws::S3
{ {
class S3Client; class S3Client;
} }
namespace DB namespace DB
{ {
class RemoteHostFilter; namespace ErrorCodes
struct HttpHeader; {
using HeaderCollection = std::vector<HttpHeader>; extern const int S3_ERROR;
} }
class RemoteHostFilter;
struct HttpHeader;
using HeaderCollection = std::vector<HttpHeader>;
class S3Exception : public Exception
{
public:
// Format message with fmt::format, like the logging functions.
template <typename... Args>
S3Exception(Aws::S3::S3Errors code_, fmt::format_string<Args...> fmt, Args &&... args)
: Exception(fmt::format(fmt, std::forward<Args>(args)...), ErrorCodes::S3_ERROR)
, code(code_)
{
}
S3Exception(const std::string & msg, Aws::S3::S3Errors code_)
: Exception(msg, ErrorCodes::S3_ERROR)
, code(code_)
{}
Aws::S3::S3Errors getS3ErrorCode() const
{
return code;
}
bool isRetryableError() const;
private:
const Aws::S3::S3Errors code;
};
}
namespace DB::S3 namespace DB::S3
{ {
class ClientFactory class ClientFactory
{ {
public: public:

View File

@ -8,6 +8,7 @@
#include <IO/WriteBufferFromS3.h> #include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/S3Common.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <aws/s3/S3Client.h> #include <aws/s3/S3Client.h>
@ -173,7 +174,9 @@ void WriteBufferFromS3::finalizeImpl()
auto response = client_ptr->HeadObject(request); auto response = client_ptr->HeadObject(request);
if (!response.IsSuccess()) if (!response.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket); throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), response.GetError().GetErrorType());
else
LOG_TRACE(log, "Object {} exists after upload", key);
} }
} }
@ -197,7 +200,7 @@ void WriteBufferFromS3::createMultipartUpload()
LOG_TRACE(log, "Multipart upload has created. Bucket: {}, Key: {}, Upload id: {}", bucket, key, multipart_upload_id); LOG_TRACE(log, "Multipart upload has created. Bucket: {}, Key: {}, Upload id: {}", bucket, key, multipart_upload_id);
} }
else else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
} }
void WriteBufferFromS3::writePart() void WriteBufferFromS3::writePart()
@ -309,7 +312,7 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
LOG_TRACE(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, task.tag, part_tags.size()); LOG_TRACE(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, task.tag, part_tags.size());
} }
else else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
total_parts_uploaded++; total_parts_uploaded++;
} }
@ -430,7 +433,7 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
if (outcome.IsSuccess()) if (outcome.IsSuccess())
LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool); LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
else else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
} }
void WriteBufferFromS3::waitForReadyBackGroundTasks() void WriteBufferFromS3::waitForReadyBackGroundTasks()

View File

@ -532,25 +532,6 @@ void IMergeTreeDataPart::removeIfNeeded()
LOG_TRACE(storage.log, "Removed part from old location {}", path); LOG_TRACE(storage.log, "Removed part from old location {}", path);
} }
} }
catch (const Exception & ex)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path));
/// In this case we want to avoid assertions, because such errors are unavoidable in setup
/// with zero-copy replication.
if (const auto * keeper_exception = dynamic_cast<const Coordination::Exception *>(&ex))
{
if (Coordination::isHardwareError(keeper_exception->code))
return;
}
/// FIXME If part it temporary, then directory will not be removed for 1 day (temporary_directories_lifetime).
/// If it's tmp_merge_<part_name> or tmp_fetch_<part_name>,
/// then all future attempts to execute part producing operation will fail with "directory already exists".
assert(!is_temp);
assert(state != MergeTreeDataPartState::DeleteOnDestroy);
assert(state != MergeTreeDataPartState::Temporary);
}
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path)); tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path));
@ -558,11 +539,6 @@ void IMergeTreeDataPart::removeIfNeeded()
/// FIXME If part it temporary, then directory will not be removed for 1 day (temporary_directories_lifetime). /// FIXME If part it temporary, then directory will not be removed for 1 day (temporary_directories_lifetime).
/// If it's tmp_merge_<part_name> or tmp_fetch_<part_name>, /// If it's tmp_merge_<part_name> or tmp_fetch_<part_name>,
/// then all future attempts to execute part producing operation will fail with "directory already exists". /// then all future attempts to execute part producing operation will fail with "directory already exists".
///
/// For remote disks this issue is really frequent, so we don't about server here
assert(!is_temp);
assert(state != MergeTreeDataPartState::DeleteOnDestroy);
assert(state != MergeTreeDataPartState::Temporary);
} }
} }
@ -1433,7 +1409,10 @@ std::pair<bool, NameSet> IMergeTreeDataPart::canRemovePart() const
{ {
/// NOTE: It's needed for zero-copy replication /// NOTE: It's needed for zero-copy replication
if (force_keep_shared_data) if (force_keep_shared_data)
{
LOG_DEBUG(storage.log, "Blobs for part {} cannot be removed because it's forced to be keeped", name);
return std::make_pair(false, NameSet{}); return std::make_pair(false, NameSet{});
}
return storage.unlockSharedData(*this); return storage.unlockSharedData(*this);
} }
@ -1457,6 +1436,12 @@ void IMergeTreeDataPart::remove() const
auto [can_remove, files_not_to_remove] = canRemovePart(); auto [can_remove, files_not_to_remove] = canRemovePart();
if (!can_remove)
LOG_TRACE(storage.log, "Blobs of part {} cannot be removed", name);
if (!files_not_to_remove.empty())
LOG_TRACE(storage.log, "Some blobs ({}) of part {} cannot be removed", fmt::join(files_not_to_remove, ", "), name);
if (!isStoredOnDisk()) if (!isStoredOnDisk())
return; return;

View File

@ -1901,7 +1901,9 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool
void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, NameSet * part_names_succeed) void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_to_remove, NameSet * part_names_succeed)
{ {
const auto settings = getSettings(); const auto settings = getSettings();
if (parts_to_remove.size() > 1 && settings->max_part_removal_threads > 1 && parts_to_remove.size() > settings->concurrent_part_removal_threshold) if (parts_to_remove.size() > 1
&& settings->max_part_removal_threads > 1
&& parts_to_remove.size() > settings->concurrent_part_removal_threshold)
{ {
/// Parallel parts removal. /// Parallel parts removal.
size_t num_threads = std::min<size_t>(settings->max_part_removal_threads, parts_to_remove.size()); size_t num_threads = std::min<size_t>(settings->max_part_removal_threads, parts_to_remove.size());
@ -1916,7 +1918,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
if (thread_group) if (thread_group)
CurrentThread::attachToIfDetached(thread_group); CurrentThread::attachToIfDetached(thread_group);
LOG_DEBUG(log, "Removing part from filesystem {}", part->name); LOG_DEBUG(log, "Removing part from filesystem {} (concurrently)", part->name);
part->remove(); part->remove();
if (part_names_succeed) if (part_names_succeed)
{ {

View File

@ -3831,9 +3831,10 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path); LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
auto settings_ptr = getSettings();
TableLockHolder table_lock_holder; TableLockHolder table_lock_holder;
if (!to_detached) if (!to_detached)
table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, settings_ptr->lock_acquire_timeout_for_background_operations);
/// Logging /// Logging
Stopwatch stopwatch; Stopwatch stopwatch;
@ -3857,7 +3858,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
covered_part_info.mutation = 0; covered_part_info.mutation = 0;
auto source_part = getActiveContainingPart(covered_part_info); auto source_part = getActiveContainingPart(covered_part_info);
if (source_part) /// Fetch for zero-copy replication is cheap and straightforward, so we don't use local clone here
if (source_part && (!settings_ptr->allow_remote_fs_zero_copy_replication || !source_part->data_part_storage->supportZeroCopyReplication()))
{ {
auto source_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums( auto source_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
source_part->getColumns(), source_part->checksums); source_part->getColumns(), source_part->checksums);
@ -3897,7 +3899,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
part_to_clone = source_part; part_to_clone = source_part;
} }
} }
} }
ReplicatedMergeTreeAddress address; ReplicatedMergeTreeAddress address;
@ -7538,21 +7539,39 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part,
std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
{ {
if (!part.data_part_storage || !part.isStoredOnDisk()) auto settings = getSettings();
if (!settings->allow_remote_fs_zero_copy_replication)
return std::make_pair(true, NameSet{}); return std::make_pair(true, NameSet{});
if (!part.data_part_storage || !part.data_part_storage->supportZeroCopyReplication()) if (!part.data_part_storage || !part.isStoredOnDisk())
{
LOG_TRACE(log, "Part {} is not stored on disk, blobs can be removed", part.name);
return std::make_pair(true, NameSet{}); return std::make_pair(true, NameSet{});
}
if (!part.data_part_storage || !part.data_part_storage->supportZeroCopyReplication())
{
LOG_TRACE(log, "Part {} is not stored on zero-copy replicaed disk, blobs can be removed", part.name);
return std::make_pair(true, NameSet{});
}
/// If part is temporary refcount file may be absent /// If part is temporary refcount file may be absent
if (part.data_part_storage->exists(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK)) if (part.data_part_storage->exists(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK))
{ {
auto ref_count = part.data_part_storage->getRefCount(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK); auto ref_count = part.data_part_storage->getRefCount(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
if (ref_count > 0) /// Keep part shard info for frozen backups if (ref_count > 0) /// Keep part shard info for frozen backups
{
LOG_TRACE(log, "Part {} has more than zero local references ({}), blobs cannot be removed", part.name, ref_count);
return std::make_pair(false, NameSet{}); return std::make_pair(false, NameSet{});
} }
else else
{ {
LOG_TRACE(log, "Part {} local references is zero, will check blobs can be removed in zookeeper", part.name);
}
}
else
{
LOG_TRACE(log, "Part {} looks temporary, because checksums file doesn't exists, blobs can be removed", part.name);
/// Temporary part with some absent file cannot be locked in shared mode /// Temporary part with some absent file cannot be locked in shared mode
return std::make_pair(true, NameSet{}); return std::make_pair(true, NameSet{});
} }
@ -7600,10 +7619,14 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
if (!children.empty()) if (!children.empty())
{ {
LOG_TRACE(logger, "Found {} ({}) zookeeper locks for {}", zookeeper_part_uniq_node, children.size(), fmt::join(children, ", ")); LOG_TRACE(logger, "Found {} ({}) zookeper locks for {}", children.size(), fmt::join(children, ", "), zookeeper_part_uniq_node);
part_has_no_more_locks = false; part_has_no_more_locks = false;
continue; continue;
} }
else
{
LOG_TRACE(logger, "No more children left for for {}, will try to remove the whole node", zookeeper_part_uniq_node);
}
auto error_code = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node); auto error_code = zookeeper_ptr->tryRemove(zookeeper_part_uniq_node);
@ -7654,7 +7677,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
} }
else else
{ {
LOG_TRACE(logger, "Can't remove parent zookeeper lock {} for part {}, because children {} ({}) were concurrently created", LOG_TRACE(logger, "Can't remove parent zookeeper lock {} for part {}, because children {} ({}) exists",
zookeeper_part_node, part_name, children.size(), fmt::join(children, ", ")); zookeeper_part_node, part_name, children.size(), fmt::join(children, ", "));
} }
} }