Review fixes, fix race, add more logging to debug failing test

This commit is contained in:
kssenii 2024-06-21 16:20:57 +02:00
parent a066b78c4d
commit 0b0f235a0d
7 changed files with 39 additions and 25 deletions

View File

@ -258,16 +258,16 @@ void S3QueueIFileMetadata::setProcessed()
LOG_TRACE(log, "Set file {} as processed (rows: {})", path, file_status->processed_rows);
}
void S3QueueIFileMetadata::setFailed(const std::string & exception, bool reduce_retry_count, bool overwrite_status)
void S3QueueIFileMetadata::setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status)
{
LOG_TRACE(log, "Setting file {} as failed (path: {}, reduce retry count: {}, exception: {})",
path, failed_node_path, reduce_retry_count, exception);
path, failed_node_path, reduce_retry_count, exception_message);
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
if (overwrite_status || file_status->state != FileStatus::State::Failed)
file_status->onFailed(exception);
file_status->onFailed(exception_message);
node_metadata.last_exception = exception;
node_metadata.last_exception = exception_message;
if (reduce_retry_count)
{
@ -282,7 +282,7 @@ void S3QueueIFileMetadata::setFailed(const std::string & exception, bool reduce_
{
auto full_exception = fmt::format(
"First exception: {}, exception while setting file as failed: {}",
exception, getCurrentExceptionMessage(true));
exception_message, getCurrentExceptionMessage(true));
file_status->onFailed(full_exception);
throw;

View File

@ -55,7 +55,7 @@ public:
bool setProcessing();
void setProcessed();
void setFailed(const std::string & exception, bool reduce_retry_count, bool overwrite_status);
void setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status);
virtual void setProcessedAtStartRequests(
Coordination::Requests & requests,

View File

@ -222,7 +222,7 @@ S3QueueMetadata::Bucket S3QueueMetadata::getBucketForPath(const std::string & pa
S3QueueOrderedFileMetadata::BucketHolderPtr
S3QueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
{
return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor);
return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
}
void S3QueueMetadata::initialize(

View File

@ -45,13 +45,15 @@ S3QueueOrderedFileMetadata::BucketHolder::BucketHolder(
int bucket_version_,
const std::string & bucket_lock_path_,
const std::string & bucket_lock_id_path_,
zkutil::ZooKeeperPtr zk_client_)
zkutil::ZooKeeperPtr zk_client_,
LoggerPtr log_)
: bucket_info(std::make_shared<BucketInfo>(BucketInfo{
.bucket = bucket_,
.bucket_version = bucket_version_,
.bucket_lock_path = bucket_lock_path_,
.bucket_lock_id_path = bucket_lock_id_path_}))
, zk_client(zk_client_)
, log(log_)
{
}
@ -62,7 +64,7 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
released = true;
LOG_TEST(getLogger("S3QueueBucketHolder"), "Releasing bucket {}, version {}",
LOG_TEST(log, "Releasing bucket {}, version {}",
bucket_info->bucket, bucket_info->bucket_version);
Coordination::Requests requests;
@ -76,11 +78,11 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
LOG_TEST(getLogger("S3QueueBucketHolder"), "Released bucket {}, version {}",
LOG_TEST(log, "Released bucket {}, version {}",
bucket_info->bucket, bucket_info->bucket_version);
else
LOG_TRACE(getLogger("S3QueueBucketHolder"),
"Failed to released bucket {}, version {}: {}. "
LOG_TRACE(log,
"Failed to release bucket {}, version {}: {}. "
"This is normal if keeper session expired.",
bucket_info->bucket, bucket_info->bucket_version, code);
@ -89,6 +91,9 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder()
{
if (!released)
LOG_TEST(log, "Releasing bucket ({}) holder in destructor", bucket_info->bucket);
try
{
release();
@ -166,7 +171,8 @@ S3QueueOrderedFileMetadata::Bucket S3QueueOrderedFileMetadata::getBucketForPath(
S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcquireBucket(
const std::filesystem::path & zk_path,
const Bucket & bucket,
const Processor & processor)
const Processor & processor,
LoggerPtr log_)
{
const auto zk_client = getZooKeeper();
const auto bucket_lock_path = zk_path / "buckets" / toString(bucket) / "lock";
@ -195,7 +201,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
const auto bucket_lock_version = set_response->stat.version;
LOG_TEST(
getLogger("S3QueueOrderedFileMetadata"),
log_,
"Processor {} acquired bucket {} for processing (bucket lock version: {})",
processor, bucket, bucket_lock_version);
@ -204,7 +210,8 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
bucket_lock_version,
bucket_lock_path,
bucket_lock_id_path,
zk_client);
zk_client,
log_);
}
if (code == Coordination::Error::ZNODEEXISTS)

View File

@ -36,7 +36,8 @@ public:
static BucketHolderPtr tryAcquireBucket(
const std::filesystem::path & zk_path,
const Bucket & bucket,
const Processor & processor);
const Processor & processor,
LoggerPtr log_);
static S3QueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num);
@ -72,14 +73,15 @@ private:
bool ignore_if_exists);
};
struct S3QueueOrderedFileMetadata::BucketHolder
struct S3QueueOrderedFileMetadata::BucketHolder : private boost::noncopyable
{
BucketHolder(
const Bucket & bucket_,
int bucket_version_,
const std::string & bucket_lock_path_,
const std::string & bucket_lock_id_path_,
zkutil::ZooKeeperPtr zk_client_);
zkutil::ZooKeeperPtr zk_client_,
LoggerPtr log_);
~BucketHolder();
@ -96,6 +98,7 @@ private:
const zkutil::ZooKeeperPtr zk_client;
bool released = false;
bool finished = false;
LoggerPtr log;
};
}

View File

@ -126,9 +126,13 @@ void StorageS3QueueSource::FileIterator::returnForRetry(Source::ObjectInfoPtr ob
void StorageS3QueueSource::FileIterator::releaseHoldBuckets()
{
for (const auto & [_, holders] : bucket_holders)
for (const auto & [processor, holders] : bucket_holders)
{
LOG_TEST(log, "Releasing {} bucket holders for processor {}", holders.size(), processor);
for (const auto & bucket_holder : holders)
bucket_holder->release();
}
}
std::pair<StorageS3QueueSource::Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
@ -635,7 +639,7 @@ Chunk StorageS3QueueSource::generateImpl()
return {};
}
void StorageS3QueueSource::commit(bool success, const std::string & exception)
void StorageS3QueueSource::commit(bool success, const std::string & exception_message)
{
LOG_TEST(log, "Having {} files to set as {}, failed files: {}",
processed_files.size(), success ? "Processed" : "Failed", failed_during_read_files.size());
@ -649,7 +653,7 @@ void StorageS3QueueSource::commit(bool success, const std::string & exception)
}
else
file_metadata->setFailed(
exception,
exception_message,
/* reduce_retry_count */false,
/* overwrite_status */true);
}

View File

@ -57,8 +57,8 @@ public:
void returnForRetry(Source::ObjectInfoPtr object_info);
/// Release hold buckets.
/// In fact, they will anyway be release in destructors of BucketHolder,
/// but we anyway release it explicitly,
/// In fact, they could be released in destructors of BucketHolder,
/// but we anyway try to release them explicitly,
/// because we want to be able to rethrow exceptions if they might happen.
void releaseHoldBuckets();
@ -85,7 +85,7 @@ public:
std::unordered_map<size_t, std::vector<BucketHolderPtr>> bucket_holders;
/// Is glob_iterator finished?
bool iterator_finished = false;
std::atomic_bool iterator_finished = false;
/// Only for processing without buckets.
std::deque<Source::ObjectInfoPtr> objects_to_retry;
@ -123,7 +123,7 @@ public:
/// Commit files after insertion into storage finished.
/// `success` defines whether insertion was successful or not.
void commit(bool success, const std::string & exception = {});
void commit(bool success, const std::string & exception_message = {});
private:
const String name;