This commit is contained in:
kssenii 2023-07-24 00:31:37 +02:00
parent 4fc3d0b290
commit f207e5bf11
2 changed files with 5 additions and 0 deletions

View File

@ -49,6 +49,7 @@ void S3QueueHolder::S3QueueCollection::write(WriteBuffer & out) const
out << processed_file.timestamp << "\n";
out << processed_file.retries_count << "\n";
}
/// todo(kssenii): use a more flexible format?
}
String S3QueueHolder::S3QueueCollection::toString() const
@ -104,6 +105,7 @@ void S3QueueHolder::S3QueueProcessedCollection::add(const String & file_name)
{
files.erase(files.begin(), files.begin() + (files.size() - max_size));
}
/// todo(kssenii): use deque here
}
@ -177,6 +179,7 @@ S3QueueHolder::S3QueueHolder(
zkutil::ZooKeeperPtr S3QueueHolder::getZooKeeper() const
{
/// todo(kssenii): current_zookeeper is not updated at all apart from in constructor, remove the lock?
std::lock_guard lock(current_zookeeper_mutex);
return current_zookeeper;
}

View File

@ -81,6 +81,7 @@ StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_header, context, read_keys_, request_settings_))
{
/// todo(kssenii): remove this loop, it should not be here
while (true)
{
KeyWithInfo val = glob_iterator->next();
@ -321,6 +322,7 @@ void StorageS3QueueSource::deleteProcessedObject(const String & file_path)
{
LOG_WARNING(log, "Delete processed file {} from bucket {}", file_path, bucket);
S3::DeleteObjectRequest request;
/// todo(kssenii): looks incorrect
String delete_key = file_path.substr(bucket.length() + 1);
request.WithKey(delete_key).WithBucket(bucket);