Do not release buckets until the files are commited

This commit is contained in:
kssenii 2024-06-18 18:23:42 +02:00
parent bd9241dabe
commit 1c415479f0
5 changed files with 86 additions and 61 deletions

View File

@ -86,12 +86,16 @@ struct S3QueueOrderedFileMetadata::BucketHolder
Bucket getBucket() const { return bucket_info->bucket; }
BucketInfoPtr getBucketInfo() const { return bucket_info; }
void setFinished() { finished = true; }
bool isFinished() const { return finished; }
void release();
private:
BucketInfoPtr bucket_info;
const zkutil::ZooKeeperPtr zk_client;
bool released = false;
bool finished = false;
};
}

View File

@ -33,7 +33,7 @@ namespace ErrorCodes
StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo(
const ObjectInfo & object_info,
Metadata::FileMetadataPtr file_metadata_)
S3QueueMetadata::FileMetadataPtr file_metadata_)
: ObjectInfo(object_info.relative_path, object_info.metadata)
, file_metadata(file_metadata_)
{
@ -41,7 +41,7 @@ StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo(
StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_)
: StorageObjectStorageSource::IIterator("S3QueueIterator")
@ -67,9 +67,9 @@ size_t StorageS3QueueSource::FileIterator::estimatedKeysCount()
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method estimateKeysCount is not implemented");
}
StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
StorageS3QueueSource::Source::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
{
ObjectInfoPtr object_info;
Source::ObjectInfoPtr object_info;
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info;
while (!shutdown_called)
@ -112,7 +112,7 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
return {};
}
void StorageS3QueueSource::FileIterator::returnForRetry(ObjectInfoPtr object_info)
void StorageS3QueueSource::FileIterator::returnForRetry(Source::ObjectInfoPtr object_info)
{
chassert(object_info);
if (metadata->useBucketsForProcessing())
@ -126,20 +126,30 @@ void StorageS3QueueSource::FileIterator::returnForRetry(ObjectInfoPtr object_inf
}
}
void StorageS3QueueSource::FileIterator::releaseHoldBuckets()
{
for (const auto & [_, holders] : bucket_holders)
for (const auto & bucket_holder : holders)
bucket_holder->release();
}
std::pair<StorageS3QueueSource::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
std::pair<StorageS3QueueSource::Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{
/// We need this lock to maintain consistency between listing s3 directory
/// and getting/putting result into listed_keys_cache.
std::lock_guard lock(buckets_mutex);
auto bucket_holder_it = bucket_holders.emplace(processor, nullptr).first;
auto bucket_holder_it = bucket_holders.emplace(processor, std::vector<BucketHolderPtr>{}).first;
BucketHolder * current_bucket_holder = bucket_holder_it->second.empty() || bucket_holder_it->second.back()->isFinished()
? nullptr
: bucket_holder_it->second.back().get();
auto current_processor = toString(processor);
LOG_TEST(
log, "Current processor: {}, acquired bucket: {}",
processor, bucket_holder_it->second ? toString(bucket_holder_it->second->getBucket()) : "None");
processor, current_bucket_holder ? toString(current_bucket_holder->getBucket()) : "None");
while (true)
{
@ -148,9 +158,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
/// In case it is already acquired, they put the key into listed_keys_cache,
/// so that the thread who acquired the bucket will be able to see
/// those keys without the need to list s3 directory once again.
if (bucket_holder_it->second)
if (current_bucket_holder)
{
const auto bucket = bucket_holder_it->second->getBucket();
const auto bucket = current_bucket_holder->getBucket();
auto it = listed_keys_cache.find(bucket);
if (it != listed_keys_cache.end())
{
@ -183,7 +193,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Current bucket: {}, will process file: {}",
bucket, object_info->getFileName());
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
LOG_TEST(log, "Cache of bucket {} is empty", bucket);
@ -198,9 +208,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
if (iterator_finished)
{
/// Bucket is fully processed - release the bucket.
bucket_holder_it->second->release();
bucket_holder_it->second.reset();
/// Bucket is fully processed, but we will release it later
/// - once we write and commit files via commit() method.
current_bucket_holder->setFinished();
}
}
/// If processing thread has already acquired some bucket
@ -209,8 +219,10 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
/// because one processing thread can acquire only one bucket at a time.
/// Once a thread is finished with its acquired bucket, it checks listed_keys_cache
/// to see if there are keys from buckets not acquired by anyone.
if (!bucket_holder_it->second)
if (!current_bucket_holder)
{
LOG_TEST(log, "Checking caches keys: {}", listed_keys_cache.size());
for (auto it = listed_keys_cache.begin(); it != listed_keys_cache.end();)
{
auto & [bucket, bucket_info] = *it;
@ -235,8 +247,8 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue;
}
bucket_holder_it->second = metadata->tryAcquireBucket(bucket, current_processor);
if (!bucket_holder_it->second)
auto acquired_bucket = metadata->tryAcquireBucket(bucket, current_processor);
if (!acquired_bucket)
{
LOG_TEST(log, "Bucket {} is already locked for processing (keys: {})",
bucket, bucket_keys.size());
@ -244,6 +256,9 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue;
}
bucket_holder_it->second.push_back(acquired_bucket);
current_bucket_holder = bucket_holder_it->second.back().get();
bucket_processor = current_processor;
/// Take the key from the front, the order is important.
@ -253,7 +268,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Acquired bucket: {}, will process file: {}",
bucket, object_info->getFileName());
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
}
@ -271,12 +286,12 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Found next file: {}, bucket: {}, current bucket: {}, cached_keys: {}",
object_info->getFileName(), bucket,
bucket_holder_it->second ? toString(bucket_holder_it->second->getBucket()) : "None",
current_bucket_holder ? toString(current_bucket_holder->getBucket()) : "None",
bucket_cache.keys.size());
if (bucket_holder_it->second)
if (current_bucket_holder)
{
if (bucket_holder_it->second->getBucket() != bucket)
if (current_bucket_holder->getBucket() != bucket)
{
/// Acquired bucket differs from object's bucket,
/// put it into bucket's cache and continue.
@ -284,13 +299,16 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue;
}
/// Bucket is already acquired, process the file.
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
else
{
bucket_holder_it->second = metadata->tryAcquireBucket(bucket, current_processor);
if (bucket_holder_it->second)
auto acquired_bucket = metadata->tryAcquireBucket(bucket, current_processor);
if (acquired_bucket)
{
bucket_holder_it->second.push_back(acquired_bucket);
current_bucket_holder = bucket_holder_it->second.back().get();
bucket_cache.processor = current_processor;
if (!bucket_cache.keys.empty())
{
@ -300,7 +318,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
object_info = bucket_cache.keys.front();
bucket_cache.keys.pop_front();
}
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
return std::pair{object_info, current_bucket_holder->getBucketInfo()};
}
else
{
@ -312,12 +330,6 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
}
else
{
if (bucket_holder_it->second)
{
bucket_holder_it->second->release();
bucket_holder_it->second.reset();
}
LOG_TEST(log, "Reached the end of file iterator");
iterator_finished = true;

View File

@ -20,24 +20,18 @@ class StorageS3QueueSource : public ISource, WithContext
{
public:
using Storage = StorageObjectStorage;
using ConfigurationPtr = Storage::ConfigurationPtr;
using GlobIterator = StorageObjectStorageSource::GlobIterator;
using ZooKeeperGetter = std::function<zkutil::ZooKeeperPtr()>;
using Source = StorageObjectStorageSource;
using RemoveFileFunc = std::function<void(std::string)>;
using FileStatusPtr = S3QueueMetadata::FileStatusPtr;
using ReaderHolder = StorageObjectStorageSource::ReaderHolder;
using Metadata = S3QueueMetadata;
using ObjectInfo = StorageObjectStorageSource::ObjectInfo;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>;
using BucketHolderPtr = S3QueueOrderedFileMetadata::BucketHolderPtr;
using BucketHolder = S3QueueOrderedFileMetadata::BucketHolder;
struct S3QueueObjectInfo : public ObjectInfo
struct S3QueueObjectInfo : public Source::ObjectInfo
{
S3QueueObjectInfo(
const ObjectInfo & object_info,
Metadata::FileMetadataPtr file_metadata_);
const Source::ObjectInfo & object_info,
S3QueueMetadata::FileMetadataPtr file_metadata_);
Metadata::FileMetadataPtr file_metadata;
S3QueueMetadata::FileMetadataPtr file_metadata;
};
class FileIterator : public StorageObjectStorageSource::IIterator
@ -45,7 +39,7 @@ public:
public:
FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_);
@ -54,37 +48,51 @@ public:
/// Note:
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
ObjectInfoPtr nextImpl(size_t processor) override;
Source::ObjectInfoPtr nextImpl(size_t processor) override;
size_t estimatedKeysCount() override;
void returnForRetry(ObjectInfoPtr object_info);
/// If the key was taken from iterator via next() call,
/// we might later want to return it back for retrying.
void returnForRetry(Source::ObjectInfoPtr object_info);
/// Release hold buckets.
/// In fact, they will anyway be release in destructors of BucketHolder,
/// but we anyway release it explicitly,
/// because we want to be able to rethrow exceptions if they might happen.
void releaseHoldBuckets();
private:
using Bucket = S3QueueMetadata::Bucket;
using Processor = S3QueueMetadata::Processor;
const std::shared_ptr<S3QueueMetadata> metadata;
const std::unique_ptr<GlobIterator> glob_iterator;
const std::unique_ptr<Source::GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called;
std::mutex mutex;
LoggerPtr log;
std::mutex buckets_mutex;
struct ListedKeys
{
std::deque<ObjectInfoPtr> keys;
std::deque<Source::ObjectInfoPtr> keys;
std::optional<Processor> processor;
};
std::unordered_map<Bucket, ListedKeys> listed_keys_cache;
/// A cache of keys which were iterated via glob_iterator, but not taken for processing.
std::unordered_map<Bucket, ListedKeys> listed_keys_cache TSA_GUARDED_BY(buckets_mutex);
/// We store a vector of holders, because we cannot release them until processed files are commited.
std::unordered_map<size_t, std::vector<BucketHolderPtr>> bucket_holders TSA_GUARDED_BY(buckets_mutex);
/// Protects bucket_holders.
std::mutex buckets_mutex;
/// Is glob_iterator finished?
bool iterator_finished = false;
std::unordered_map<size_t, S3QueueOrderedFileMetadata::BucketHolderPtr> bucket_holders;
/// Only for processing without buckets.
std::deque<ObjectInfoPtr> objects_to_retry;
std::deque<Source::ObjectInfoPtr> objects_to_retry;
std::pair<ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
std::pair<Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
bool hasKeysForProcessor(const Processor & processor) const;
};
@ -137,11 +145,11 @@ private:
RemoveFileFunc remove_file_func;
LoggerPtr log;
std::vector<Metadata::FileMetadataPtr> processed_files;
std::vector<Metadata::FileMetadataPtr> failed_during_read_files;
std::vector<S3QueueMetadata::FileMetadataPtr> processed_files;
std::vector<S3QueueMetadata::FileMetadataPtr> failed_during_read_files;
ReaderHolder reader;
std::future<ReaderHolder> reader_future;
Source::ReaderHolder reader;
std::future<Source::ReaderHolder> reader_future;
std::atomic<bool> initialized{false};
size_t processed_rows_from_file = 0;
@ -150,8 +158,6 @@ private:
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
S3QueueOrderedFileMetadata::BucketHolderPtr current_bucket_holder;
Chunk generateImpl();
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, S3QueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);

View File

@ -521,12 +521,15 @@ bool StorageS3Queue::streamToViews()
{
for (auto & source : sources)
source->commit(/* success */false, getCurrentExceptionMessage(true));
file_iterator->releaseHoldBuckets();
throw;
}
for (auto & source : sources)
source->commit(/* success */true);
file_iterator->releaseHoldBuckets();
total_rows += rows;
}

View File

@ -1305,7 +1305,7 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(10):
for _ in range(30):
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows: