This commit is contained in:
kssenii 2024-06-24 12:17:29 +02:00
parent 0b0f235a0d
commit cd6995e266

View File

@ -74,10 +74,12 @@ StorageS3QueueSource::Source::ObjectInfoPtr StorageS3QueueSource::FileIterator::
{
if (metadata->useBucketsForProcessing())
{
std::lock_guard lock(mutex);
std::tie(object_info, bucket_info) = getNextKeyFromAcquiredBucket(processor);
}
else
{
std::lock_guard lock(mutex);
if (objects_to_retry.empty())
{
object_info = glob_iterator->next(processor);
@ -138,10 +140,6 @@ void StorageS3QueueSource::FileIterator::releaseHoldBuckets()
std::pair<StorageS3QueueSource::Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{
/// We need this lock to maintain consistency between listing s3 directory
/// and getting/putting result into listed_keys_cache.
std::lock_guard lock(mutex);
auto bucket_holder_it = bucket_holders.emplace(processor, std::vector<BucketHolderPtr>{}).first;
BucketHolder * current_bucket_holder = bucket_holder_it->second.empty() || bucket_holder_it->second.back()->isFinished()
? nullptr