mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Use S3 multi object delete
Some strange with s3_with_proxy test
This commit is contained in:
parent
e0c7cf842b
commit
c18cdec77c
@ -18,7 +18,7 @@
|
||||
#include <Common/thread_local_rng.h>
|
||||
|
||||
#include <aws/s3/model/CopyObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h>
|
||||
#include <aws/s3/model/GetObjectRequest.h>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
@ -36,6 +36,30 @@ namespace ErrorCodes
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
class DiskS3::AwsS3KeyKeeper : public std::list<Aws::Vector<Aws::S3::Model::ObjectIdentifier>>
|
||||
{
|
||||
public:
|
||||
void addKey(const String & key);
|
||||
|
||||
private:
|
||||
/// limit for one DeleteObject request
|
||||
/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
const static size_t chunk_limit = 1000;
|
||||
};
|
||||
|
||||
void DiskS3::AwsS3KeyKeeper::addKey(const String & key)
|
||||
{
|
||||
if (empty() || back().size() >= chunk_limit)
|
||||
{ /// add one more chunk
|
||||
push_back(value_type());
|
||||
back().reserve(chunk_limit);
|
||||
}
|
||||
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(key);
|
||||
back().push_back(obj);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
String getRandomName()
|
||||
@ -634,7 +658,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::remove(const String & path)
|
||||
void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Remove file by path: {}", backQuote(metadata_path + path));
|
||||
|
||||
@ -647,14 +671,9 @@ void DiskS3::remove(const String & path)
|
||||
if (metadata.ref_count == 0)
|
||||
{
|
||||
file.remove();
|
||||
|
||||
for (const auto & [s3_object_path, _] : metadata.s3_objects)
|
||||
{
|
||||
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
|
||||
Aws::S3::Model::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(s3_root_path + s3_object_path);
|
||||
throwIfError(client->DeleteObject(request));
|
||||
}
|
||||
keys.addKey(s3_root_path + s3_object_path);
|
||||
}
|
||||
else /// In other case decrement number of references, save metadata and delete file.
|
||||
{
|
||||
@ -665,25 +684,57 @@ void DiskS3::remove(const String & path)
|
||||
}
|
||||
else
|
||||
file.remove();
|
||||
|
||||
}
|
||||
|
||||
void DiskS3::removeRecursive(const String & path)
|
||||
void DiskS3::removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys)
|
||||
{
|
||||
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
|
||||
|
||||
Poco::File file(metadata_path + path);
|
||||
if (file.isFile())
|
||||
{
|
||||
remove(path);
|
||||
removeMeta(path, keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
|
||||
removeRecursive(it->path());
|
||||
removeMetaRecursive(it->path(), keys);
|
||||
file.remove();
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
|
||||
{
|
||||
if (!keys.empty())
|
||||
{
|
||||
for (const auto & chunk : keys)
|
||||
{
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(chunk);
|
||||
|
||||
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
|
||||
Aws::S3::Model::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
throwIfError(client->DeleteObjects(request));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DiskS3::remove(const String & path)
|
||||
{
|
||||
AwsS3KeyKeeper keys;
|
||||
removeMeta(path, keys);
|
||||
removeAws(keys);
|
||||
}
|
||||
|
||||
void DiskS3::removeRecursive(const String & path)
|
||||
{
|
||||
AwsS3KeyKeeper keys;
|
||||
removeMetaRecursive(path, keys);
|
||||
removeAws(keys);
|
||||
}
|
||||
|
||||
bool DiskS3::tryReserve(UInt64 bytes)
|
||||
{
|
||||
|
@ -21,6 +21,8 @@ class DiskS3 : public IDisk
|
||||
public:
|
||||
friend class DiskS3Reservation;
|
||||
|
||||
class AwsS3KeyKeeper;
|
||||
|
||||
DiskS3(
|
||||
String name_,
|
||||
std::shared_ptr<Aws::S3::S3Client> client_,
|
||||
@ -111,6 +113,10 @@ public:
|
||||
private:
|
||||
bool tryReserve(UInt64 bytes);
|
||||
|
||||
void removeMeta(const String & path, AwsS3KeyKeeper & keys);
|
||||
void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys);
|
||||
void removeAws(const AwsS3KeyKeeper & keys);
|
||||
|
||||
private:
|
||||
const String name;
|
||||
std::shared_ptr<Aws::S3::S3Client> client;
|
||||
|
@ -10,7 +10,7 @@ logging.getLogger().addHandler(logging.StreamHandler())
|
||||
def check_proxy_logs(cluster, proxy_instance):
|
||||
logs = cluster.get_container_logs(proxy_instance)
|
||||
# Check that all possible interactions with Minio are present
|
||||
for http_method in ["PUT", "GET", "DELETE"]:
|
||||
for http_method in ["PUT", "GET", "POST"]:
|
||||
assert logs.find(http_method + " https://minio1") >= 0
|
||||
|
||||
|
||||
|
@ -36,7 +36,7 @@ def cluster():
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET", "DELETE"}):
|
||||
def check_proxy_logs(cluster, proxy_instance, http_methods={"POST", "PUT", "GET"}):
|
||||
logs = cluster.get_container_logs(proxy_instance)
|
||||
# Check that all possible interactions with Minio are present
|
||||
for http_method in http_methods:
|
||||
@ -66,5 +66,5 @@ def test_s3_with_proxy_list(cluster, policy):
|
||||
|
||||
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
||||
|
||||
for proxy in ["proxy1", "proxy2"]:
|
||||
check_proxy_logs(cluster, proxy, ["PUT", "GET", "DELETE"])
|
||||
check_proxy_logs(cluster, "proxy1", ["PUT", "POST"])
|
||||
check_proxy_logs(cluster, "proxy2", ["PUT", "POST", "GET"])
|
||||
|
Loading…
Reference in New Issue
Block a user