mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Extract S3 logic
This commit is contained in:
parent
fc6c1faa57
commit
eefbbf53e8
@ -45,6 +45,7 @@ if (BUILD_STANDALONE_KEEPER)
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperLogStore.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperServer.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperSnapshotManager.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperSnapshotManagerS3.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateMachine.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateManager.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStorage.cpp
|
||||
|
@ -9,23 +9,6 @@
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
#include <Core/UUID.h>
|
||||
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/S3/PocoHTTPClient.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
|
||||
#include <aws/core/auth/AWSCredentials.h>
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/S3Errors.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectRequest.h>
|
||||
#endif
|
||||
|
||||
#include <future>
|
||||
#include <chrono>
|
||||
@ -54,14 +37,9 @@ namespace ErrorCodes
|
||||
|
||||
KeeperDispatcher::KeeperDispatcher()
|
||||
: responses_queue(std::numeric_limits<size_t>::max())
|
||||
#if USE_AWS_S3
|
||||
, snapshots_s3_queue(std::numeric_limits<size_t>::max())
|
||||
#endif
|
||||
, configuration_and_settings(std::make_shared<KeeperConfigurationAndSettings>())
|
||||
, log(&Poco::Logger::get("KeeperDispatcher"))
|
||||
{
|
||||
}
|
||||
|
||||
{}
|
||||
|
||||
void KeeperDispatcher::requestThread()
|
||||
{
|
||||
@ -220,16 +198,11 @@ void KeeperDispatcher::snapshotThread()
|
||||
{
|
||||
[[maybe_unused]] auto snapshot_path = task.create_snapshot(std::move(task.snapshot));
|
||||
|
||||
#if USE_AWS_S3
|
||||
if (snapshot_path.empty())
|
||||
continue;
|
||||
|
||||
if (isLeader() && getSnapshotS3Client() != nullptr)
|
||||
{
|
||||
if (!snapshots_s3_queue.push(snapshot_path))
|
||||
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", snapshot_path);
|
||||
}
|
||||
#endif
|
||||
if (isLeader())
|
||||
snapshot_s3.uploadSnapshot(snapshot_path);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -238,222 +211,6 @@ void KeeperDispatcher::snapshotThread()
|
||||
}
|
||||
}
|
||||
|
||||
#if USE_AWS_S3
|
||||
struct KeeperDispatcher::S3Configuration
|
||||
{
|
||||
S3Configuration(S3::URI uri_, S3::AuthSettings auth_settings_, std::shared_ptr<const Aws::S3::S3Client> client_)
|
||||
: uri(std::move(uri_))
|
||||
, auth_settings(std::move(auth_settings_))
|
||||
, client(std::move(client_))
|
||||
{}
|
||||
|
||||
S3::URI uri;
|
||||
S3::AuthSettings auth_settings;
|
||||
std::shared_ptr<const Aws::S3::S3Client> client;
|
||||
};
|
||||
|
||||
void KeeperDispatcher::updateS3Configuration(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
try
|
||||
{
|
||||
const std::string config_prefix = "keeper_server.s3_snapshot";
|
||||
|
||||
if (!config.has(config_prefix))
|
||||
{
|
||||
std::lock_guard client_lock{snapshot_s3_client_mutex};
|
||||
if (snapshot_s3_client)
|
||||
LOG_INFO(log, "S3 configuration was removed");
|
||||
snapshot_s3_client = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
auto auth_settings = S3::AuthSettings::loadFromConfig(config_prefix, config);
|
||||
|
||||
auto endpoint = config.getString(config_prefix + ".endpoint");
|
||||
auto new_uri = S3::URI{Poco::URI(endpoint)};
|
||||
|
||||
std::unique_lock client_lock{snapshot_s3_client_mutex};
|
||||
|
||||
if (snapshot_s3_client && snapshot_s3_client->client && auth_settings == snapshot_s3_client->auth_settings
|
||||
&& snapshot_s3_client->uri.uri == new_uri.uri)
|
||||
return;
|
||||
|
||||
LOG_INFO(log, "S3 configuration was updated");
|
||||
|
||||
client_lock.unlock();
|
||||
|
||||
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
|
||||
HeaderCollection headers = auth_settings.headers;
|
||||
|
||||
static constexpr size_t s3_max_redirects = 10;
|
||||
static constexpr bool enable_s3_requests_logging = false;
|
||||
|
||||
if (!new_uri.key.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Invalid endpoint defined for S3, it shouldn't contain key, endpoint: {}", endpoint);
|
||||
return;
|
||||
}
|
||||
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
auth_settings.region,
|
||||
RemoteHostFilter(), s3_max_redirects,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false);
|
||||
|
||||
client_configuration.endpointOverride = new_uri.endpoint;
|
||||
|
||||
auto client = S3::ClientFactory::instance().create(
|
||||
client_configuration,
|
||||
new_uri.is_virtual_hosted_style,
|
||||
credentials.GetAWSAccessKeyId(),
|
||||
credentials.GetAWSSecretKey(),
|
||||
auth_settings.server_side_encryption_customer_key_base64,
|
||||
std::move(headers),
|
||||
auth_settings.use_environment_credentials.value_or(false),
|
||||
auth_settings.use_insecure_imds_request.value_or(false));
|
||||
|
||||
auto new_client = std::make_shared<KeeperDispatcher::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));
|
||||
|
||||
client_lock.lock();
|
||||
snapshot_s3_client = std::move(new_client);
|
||||
client_lock.unlock();
|
||||
LOG_INFO(log, "S3 client was updated");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_ERROR(log, "Failed to create an S3 client for snapshots");
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperDispatcher::snapshotS3Thread()
|
||||
{
|
||||
setThreadName("KeeperS3SnpT");
|
||||
|
||||
auto uuid = UUIDHelpers::generateV4();
|
||||
while (!shutdown_called)
|
||||
{
|
||||
std::string snapshot_path;
|
||||
if (!snapshots_s3_queue.pop(snapshot_path))
|
||||
break;
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
try
|
||||
{
|
||||
auto s3_client = getSnapshotS3Client();
|
||||
if (s3_client == nullptr)
|
||||
continue;
|
||||
|
||||
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_path);
|
||||
ReadBufferFromFile snapshot_file(snapshot_path);
|
||||
|
||||
S3Settings::ReadWriteSettings read_write_settings;
|
||||
read_write_settings.upload_part_size_multiply_parts_count_threshold = 10000;
|
||||
|
||||
const auto create_writer = [&](const auto & key)
|
||||
{
|
||||
return WriteBufferFromS3
|
||||
{
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
key,
|
||||
read_write_settings
|
||||
};
|
||||
};
|
||||
|
||||
auto snapshot_name = fs::path(snapshot_path).filename().string();
|
||||
auto lock_file = fmt::format(".{}_LOCK", snapshot_name);
|
||||
|
||||
const auto file_exists = [&](const auto & key)
|
||||
{
|
||||
Aws::S3::Model::HeadObjectRequest request;
|
||||
request.SetBucket(s3_client->uri.bucket);
|
||||
request.SetKey(key);
|
||||
auto outcome = s3_client->client->HeadObject(request);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
return true;
|
||||
|
||||
const auto & error = outcome.GetError();
|
||||
if (error.GetErrorType() != Aws::S3::S3Errors::NO_SUCH_KEY && error.GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
|
||||
throw S3Exception(error.GetErrorType(), "Failed to verify existence of lock file: {}", error.GetMessage());
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
if (file_exists(snapshot_name))
|
||||
{
|
||||
LOG_ERROR(log, "Snapshot {} already exists", snapshot_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// First we need to verify that there isn't already a lock file for the snapshot we want to upload
|
||||
if (file_exists(lock_file))
|
||||
{
|
||||
LOG_ERROR(log, "Lock file for {} already, exists. Probably a different node is already uploading the snapshot", snapshot_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// We write our UUID to lock file
|
||||
LOG_DEBUG(log, "Trying to create a lock file");
|
||||
WriteBufferFromS3 lock_writer = create_writer(lock_file);
|
||||
writeUUIDText(uuid, lock_writer);
|
||||
lock_writer.finalize();
|
||||
|
||||
const auto read_lock_file = [&]() -> std::string
|
||||
{
|
||||
ReadBufferFromS3 lock_reader
|
||||
{
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
lock_file,
|
||||
"",
|
||||
1,
|
||||
{}
|
||||
};
|
||||
|
||||
std::string read_uuid;
|
||||
readStringUntilEOF(read_uuid, lock_reader);
|
||||
|
||||
return read_uuid;
|
||||
};
|
||||
|
||||
// We read back the written UUID, if it's the same we can upload the file
|
||||
auto read_uuid = read_lock_file();
|
||||
|
||||
if (read_uuid != toString(uuid))
|
||||
{
|
||||
LOG_ERROR(log, "Failed to create a lock file");
|
||||
continue;
|
||||
}
|
||||
|
||||
WriteBufferFromS3 snapshot_writer = create_writer(snapshot_name);
|
||||
copyData(snapshot_file, snapshot_writer);
|
||||
snapshot_writer.finalize();
|
||||
|
||||
LOG_INFO(log, "Successfully uploaded {} to S3", snapshot_path);
|
||||
|
||||
LOG_INFO(log, "Removing lock file");
|
||||
Aws::S3::Model::DeleteObjectRequest delete_request;
|
||||
delete_request.SetBucket(s3_client->uri.bucket);
|
||||
delete_request.SetKey(lock_file);
|
||||
auto delete_outcome = s3_client->client->DeleteObject(delete_request);
|
||||
|
||||
if (!delete_outcome.IsSuccess())
|
||||
throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType());
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_INFO(log, "Failure during upload of {} to S3", snapshot_path);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
|
||||
{
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
@ -539,9 +296,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
|
||||
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
|
||||
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
|
||||
|
||||
#if USE_AWS_S3
|
||||
snapshot_s3_thread = ThreadFromGlobalPool([this] { snapshotS3Thread(); });
|
||||
#endif
|
||||
snapshot_s3.startup();
|
||||
|
||||
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue);
|
||||
|
||||
@ -607,11 +362,7 @@ void KeeperDispatcher::shutdown()
|
||||
if (snapshot_thread.joinable())
|
||||
snapshot_thread.join();
|
||||
|
||||
#if USE_AWS_S3
|
||||
snapshots_s3_queue.finish();
|
||||
if (snapshot_s3_thread.joinable())
|
||||
snapshot_s3_thread.join();
|
||||
#endif
|
||||
snapshot_s3.shutdown();
|
||||
|
||||
update_configuration_queue.finish();
|
||||
if (update_configuration_thread.joinable())
|
||||
@ -803,14 +554,6 @@ void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, Keep
|
||||
requests_for_sessions.clear();
|
||||
}
|
||||
|
||||
#if USE_AWS_S3
|
||||
std::shared_ptr<KeeperDispatcher::S3Configuration> KeeperDispatcher::getSnapshotS3Client() const
|
||||
{
|
||||
std::lock_guard lock{snapshot_s3_client_mutex};
|
||||
return snapshot_s3_client;
|
||||
}
|
||||
#endif
|
||||
|
||||
int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
||||
{
|
||||
/// New session id allocation is a special request, because we cannot process it in normal
|
||||
@ -951,9 +694,7 @@ void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfigurati
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
|
||||
}
|
||||
|
||||
#if USE_AWS_S3
|
||||
updateS3Configuration(config);
|
||||
#endif
|
||||
snapshot_s3.updateS3Configuration(config);
|
||||
}
|
||||
|
||||
void KeeperDispatcher::updateKeeperStatLatency(uint64_t process_time_ms)
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <Coordination/CoordinationSettings.h>
|
||||
#include <Coordination/Keeper4LWInfo.h>
|
||||
#include <Coordination/KeeperConnectionStats.h>
|
||||
#include <Coordination/KeeperSnapshotManagerS3.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -36,11 +37,6 @@ private:
|
||||
ResponsesQueue responses_queue;
|
||||
SnapshotsQueue snapshots_queue{1};
|
||||
|
||||
#if USE_AWS_S3
|
||||
using SnapshotBackupQueue = ConcurrentBoundedQueue<std::string>;
|
||||
SnapshotBackupQueue snapshots_s3_queue;
|
||||
#endif
|
||||
|
||||
/// More than 1k updates is definitely misconfiguration.
|
||||
UpdateConfigurationQueue update_configuration_queue{1000};
|
||||
|
||||
@ -67,10 +63,6 @@ private:
|
||||
ThreadFromGlobalPool session_cleaner_thread;
|
||||
/// Dumping new snapshots to disk
|
||||
ThreadFromGlobalPool snapshot_thread;
|
||||
#if USE_AWS_S3
|
||||
/// Upload new snapshots to S3
|
||||
ThreadFromGlobalPool snapshot_s3_thread;
|
||||
#endif
|
||||
/// Apply or wait for configuration changes
|
||||
ThreadFromGlobalPool update_configuration_thread;
|
||||
|
||||
@ -86,11 +78,7 @@ private:
|
||||
/// Counter for new session_id requests.
|
||||
std::atomic<int64_t> internal_session_id_counter{0};
|
||||
|
||||
#if USE_AWS_S3
|
||||
struct S3Configuration;
|
||||
mutable std::mutex snapshot_s3_client_mutex;
|
||||
std::shared_ptr<S3Configuration> snapshot_s3_client;
|
||||
#endif
|
||||
KeeperSnapshotManagerS3 snapshot_s3;
|
||||
|
||||
/// Thread put requests to raft
|
||||
void requestThread();
|
||||
@ -100,10 +88,6 @@ private:
|
||||
void sessionCleanerTask();
|
||||
/// Thread create snapshots in the background
|
||||
void snapshotThread();
|
||||
#if USE_AWS_S3
|
||||
/// Thread upload snapshots to S3 in the background
|
||||
void snapshotS3Thread();
|
||||
#endif
|
||||
/// Thread apply or wait configuration changes from leader
|
||||
void updateConfigurationThread();
|
||||
|
||||
@ -117,10 +101,6 @@ private:
|
||||
/// Clears both arguments
|
||||
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
|
||||
|
||||
#if USE_AWS_S3
|
||||
std::shared_ptr<S3Configuration> getSnapshotS3Client() const;
|
||||
#endif
|
||||
|
||||
public:
|
||||
/// Just allocate some objects, real initialization is done by `intialize method`
|
||||
KeeperDispatcher();
|
||||
@ -146,9 +126,6 @@ public:
|
||||
/// Registered in ConfigReloader callback. Add new configuration changes to
|
||||
/// update_configuration_queue. Keeper Dispatcher apply them asynchronously.
|
||||
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
|
||||
#if USE_AWS_S3
|
||||
void updateS3Configuration(const Poco::Util::AbstractConfiguration & config);
|
||||
#endif
|
||||
|
||||
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
|
||||
void shutdown();
|
||||
|
293
src/Coordination/KeeperSnapshotManagerS3.cpp
Normal file
293
src/Coordination/KeeperSnapshotManagerS3.cpp
Normal file
@ -0,0 +1,293 @@
|
||||
#include <Coordination/KeeperSnapshotManagerS3.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
#include <Core/UUID.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/S3/PocoHTTPClient.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
|
||||
#include <aws/core/auth/AWSCredentials.h>
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/S3Errors.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectRequest.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct KeeperSnapshotManagerS3::S3Configuration
|
||||
{
|
||||
S3Configuration(S3::URI uri_, S3::AuthSettings auth_settings_, std::shared_ptr<const Aws::S3::S3Client> client_)
|
||||
: uri(std::move(uri_))
|
||||
, auth_settings(std::move(auth_settings_))
|
||||
, client(std::move(client_))
|
||||
{}
|
||||
|
||||
S3::URI uri;
|
||||
S3::AuthSettings auth_settings;
|
||||
std::shared_ptr<const Aws::S3::S3Client> client;
|
||||
};
|
||||
|
||||
KeeperSnapshotManagerS3::KeeperSnapshotManagerS3()
|
||||
: snapshots_s3_queue(std::numeric_limits<size_t>::max())
|
||||
, log(&Poco::Logger::get("KeeperSnapshotManagerS3"))
|
||||
{}
|
||||
|
||||
void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
try
|
||||
{
|
||||
const std::string config_prefix = "keeper_server.s3_snapshot";
|
||||
|
||||
if (!config.has(config_prefix))
|
||||
{
|
||||
std::lock_guard client_lock{snapshot_s3_client_mutex};
|
||||
if (snapshot_s3_client)
|
||||
LOG_INFO(log, "S3 configuration was removed");
|
||||
snapshot_s3_client = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
auto auth_settings = S3::AuthSettings::loadFromConfig(config_prefix, config);
|
||||
|
||||
auto endpoint = config.getString(config_prefix + ".endpoint");
|
||||
auto new_uri = S3::URI{Poco::URI(endpoint)};
|
||||
|
||||
std::unique_lock client_lock{snapshot_s3_client_mutex};
|
||||
|
||||
if (snapshot_s3_client && snapshot_s3_client->client && auth_settings == snapshot_s3_client->auth_settings
|
||||
&& snapshot_s3_client->uri.uri == new_uri.uri)
|
||||
return;
|
||||
|
||||
LOG_INFO(log, "S3 configuration was updated");
|
||||
|
||||
client_lock.unlock();
|
||||
|
||||
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
|
||||
HeaderCollection headers = auth_settings.headers;
|
||||
|
||||
static constexpr size_t s3_max_redirects = 10;
|
||||
static constexpr bool enable_s3_requests_logging = false;
|
||||
|
||||
if (!new_uri.key.empty())
|
||||
{
|
||||
LOG_ERROR(log, "Invalid endpoint defined for S3, it shouldn't contain key, endpoint: {}", endpoint);
|
||||
return;
|
||||
}
|
||||
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
auth_settings.region,
|
||||
RemoteHostFilter(), s3_max_redirects,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false);
|
||||
|
||||
client_configuration.endpointOverride = new_uri.endpoint;
|
||||
|
||||
auto client = S3::ClientFactory::instance().create(
|
||||
client_configuration,
|
||||
new_uri.is_virtual_hosted_style,
|
||||
credentials.GetAWSAccessKeyId(),
|
||||
credentials.GetAWSSecretKey(),
|
||||
auth_settings.server_side_encryption_customer_key_base64,
|
||||
std::move(headers),
|
||||
auth_settings.use_environment_credentials.value_or(false),
|
||||
auth_settings.use_insecure_imds_request.value_or(false));
|
||||
|
||||
auto new_client = std::make_shared<KeeperSnapshotManagerS3::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));
|
||||
|
||||
client_lock.lock();
|
||||
snapshot_s3_client = std::move(new_client);
|
||||
client_lock.unlock();
|
||||
LOG_INFO(log, "S3 client was updated");
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_ERROR(log, "Failed to create an S3 client for snapshots");
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
std::shared_ptr<KeeperSnapshotManagerS3::S3Configuration> KeeperSnapshotManagerS3::getSnapshotS3Client() const
|
||||
{
|
||||
std::lock_guard lock{snapshot_s3_client_mutex};
|
||||
return snapshot_s3_client;
|
||||
}
|
||||
|
||||
void KeeperSnapshotManagerS3::snapshotS3Thread()
|
||||
{
|
||||
setThreadName("KeeperS3SnpT");
|
||||
|
||||
auto uuid = UUIDHelpers::generateV4();
|
||||
while (!shutdown_called)
|
||||
{
|
||||
std::string snapshot_path;
|
||||
if (!snapshots_s3_queue.pop(snapshot_path))
|
||||
break;
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
try
|
||||
{
|
||||
auto s3_client = getSnapshotS3Client();
|
||||
if (s3_client == nullptr)
|
||||
continue;
|
||||
|
||||
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_path);
|
||||
ReadBufferFromFile snapshot_file(snapshot_path);
|
||||
|
||||
S3Settings::ReadWriteSettings read_write_settings;
|
||||
read_write_settings.upload_part_size_multiply_parts_count_threshold = 10000;
|
||||
|
||||
const auto create_writer = [&](const auto & key)
|
||||
{
|
||||
return WriteBufferFromS3
|
||||
{
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
key,
|
||||
read_write_settings
|
||||
};
|
||||
};
|
||||
|
||||
auto snapshot_name = fs::path(snapshot_path).filename().string();
|
||||
auto lock_file = fmt::format(".{}_LOCK", snapshot_name);
|
||||
|
||||
const auto file_exists = [&](const auto & key)
|
||||
{
|
||||
Aws::S3::Model::HeadObjectRequest request;
|
||||
request.SetBucket(s3_client->uri.bucket);
|
||||
request.SetKey(key);
|
||||
auto outcome = s3_client->client->HeadObject(request);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
return true;
|
||||
|
||||
const auto & error = outcome.GetError();
|
||||
if (error.GetErrorType() != Aws::S3::S3Errors::NO_SUCH_KEY && error.GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
|
||||
throw S3Exception(error.GetErrorType(), "Failed to verify existence of lock file: {}", error.GetMessage());
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
if (file_exists(snapshot_name))
|
||||
{
|
||||
LOG_ERROR(log, "Snapshot {} already exists", snapshot_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// First we need to verify that there isn't already a lock file for the snapshot we want to upload
|
||||
if (file_exists(lock_file))
|
||||
{
|
||||
LOG_ERROR(log, "Lock file for {} already, exists. Probably a different node is already uploading the snapshot", snapshot_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// We write our UUID to lock file
|
||||
LOG_DEBUG(log, "Trying to create a lock file");
|
||||
WriteBufferFromS3 lock_writer = create_writer(lock_file);
|
||||
writeUUIDText(uuid, lock_writer);
|
||||
lock_writer.finalize();
|
||||
|
||||
const auto read_lock_file = [&]() -> std::string
|
||||
{
|
||||
ReadBufferFromS3 lock_reader
|
||||
{
|
||||
s3_client->client,
|
||||
s3_client->uri.bucket,
|
||||
lock_file,
|
||||
"",
|
||||
1,
|
||||
{}
|
||||
};
|
||||
|
||||
std::string read_uuid;
|
||||
readStringUntilEOF(read_uuid, lock_reader);
|
||||
|
||||
return read_uuid;
|
||||
};
|
||||
|
||||
// We read back the written UUID, if it's the same we can upload the file
|
||||
auto read_uuid = read_lock_file();
|
||||
|
||||
if (read_uuid != toString(uuid))
|
||||
{
|
||||
LOG_ERROR(log, "Failed to create a lock file");
|
||||
continue;
|
||||
}
|
||||
|
||||
WriteBufferFromS3 snapshot_writer = create_writer(snapshot_name);
|
||||
copyData(snapshot_file, snapshot_writer);
|
||||
snapshot_writer.finalize();
|
||||
|
||||
LOG_INFO(log, "Successfully uploaded {} to S3", snapshot_path);
|
||||
|
||||
LOG_INFO(log, "Removing lock file");
|
||||
Aws::S3::Model::DeleteObjectRequest delete_request;
|
||||
delete_request.SetBucket(s3_client->uri.bucket);
|
||||
delete_request.SetKey(lock_file);
|
||||
auto delete_outcome = s3_client->client->DeleteObject(delete_request);
|
||||
|
||||
if (!delete_outcome.IsSuccess())
|
||||
throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType());
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_INFO(log, "Failure during upload of {} to S3", snapshot_path);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperSnapshotManagerS3::uploadSnapshot(const std::string & path)
|
||||
{
|
||||
if (getSnapshotS3Client() == nullptr)
|
||||
return;
|
||||
|
||||
if (!snapshots_s3_queue.push(path))
|
||||
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", path);
|
||||
}
|
||||
|
||||
void KeeperSnapshotManagerS3::startup()
|
||||
{
|
||||
snapshot_s3_thread = ThreadFromGlobalPool([this] { snapshotS3Thread(); });
|
||||
}
|
||||
|
||||
void KeeperSnapshotManagerS3::shutdown()
|
||||
{
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
LOG_DEBUG(log, "Shutting down KeeperSnapshotManagerS3");
|
||||
shutdown_called = true;
|
||||
|
||||
try
|
||||
{
|
||||
snapshots_s3_queue.finish();
|
||||
if (snapshot_s3_thread.joinable())
|
||||
snapshot_s3_thread.join();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
LOG_INFO(log, "KeeperSnapshotManagerS3 shut down");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
63
src/Coordination/KeeperSnapshotManagerS3.h
Normal file
63
src/Coordination/KeeperSnapshotManagerS3.h
Normal file
@ -0,0 +1,63 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/config.h>
|
||||
#include <config_core.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
#include <string>
|
||||
#endif
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
#if USE_AWS_S3
|
||||
class KeeperSnapshotManagerS3
|
||||
{
|
||||
public:
|
||||
KeeperSnapshotManagerS3();
|
||||
|
||||
void updateS3Configuration(const Poco::Util::AbstractConfiguration & config);
|
||||
void uploadSnapshot(const std::string & path);
|
||||
|
||||
void startup();
|
||||
void shutdown();
|
||||
private:
|
||||
using SnapshotS3Queue = ConcurrentBoundedQueue<std::string>;
|
||||
SnapshotS3Queue snapshots_s3_queue;
|
||||
|
||||
/// Upload new snapshots to S3
|
||||
ThreadFromGlobalPool snapshot_s3_thread;
|
||||
|
||||
struct S3Configuration;
|
||||
mutable std::mutex snapshot_s3_client_mutex;
|
||||
std::shared_ptr<S3Configuration> snapshot_s3_client;
|
||||
|
||||
std::atomic<bool> shutdown_called{false};
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
/// Thread upload snapshots to S3 in the background
|
||||
void snapshotS3Thread();
|
||||
std::shared_ptr<S3Configuration> getSnapshotS3Client() const;
|
||||
};
|
||||
#else
|
||||
class KeeperSnapshotManagerS3
|
||||
{
|
||||
public:
|
||||
KeeperSnapshotManagerS3() = default;
|
||||
|
||||
void updateS3Configuration(const Poco::Util::AbstractConfiguration &) {}
|
||||
void uploadSnapshot(const std::string &) {}
|
||||
|
||||
void startup() {}
|
||||
|
||||
void shutdown() {}
|
||||
};
|
||||
#endif
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user