From 44a3d6babef0736cbd9577a89084da9385e9e662 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 22 Sep 2022 13:03:27 +0000 Subject: [PATCH] Address PR comments --- src/Coordination/KeeperDispatcher.cpp | 3 +- src/Coordination/KeeperSnapshotManagerS3.cpp | 54 +++++++++++-------- src/Coordination/KeeperSnapshotManagerS3.h | 4 +- .../test_keeper_s3_snapshot/test.py | 4 ++ 4 files changed, 40 insertions(+), 25 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 177996d8ee8..1b265abbfb6 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -296,7 +296,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf responses_thread = ThreadFromGlobalPool([this] { responseThread(); }); snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); }); - snapshot_s3.startup(); + snapshot_s3.startup(config); server = std::make_unique(configuration_and_settings, config, responses_queue, snapshots_queue); @@ -325,7 +325,6 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf /// Start it after keeper server start session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); }); update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); }); - updateConfiguration(config); LOG_DEBUG(log, "Dispatcher initialized"); } diff --git a/src/Coordination/KeeperSnapshotManagerS3.cpp b/src/Coordination/KeeperSnapshotManagerS3.cpp index 770d8dc94a6..4776b1f84c8 100644 --- a/src/Coordination/KeeperSnapshotManagerS3.cpp +++ b/src/Coordination/KeeperSnapshotManagerS3.cpp @@ -66,16 +66,16 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo auto endpoint = config.getString(config_prefix + ".endpoint"); auto new_uri = S3::URI{Poco::URI(endpoint)}; - std::unique_lock client_lock{snapshot_s3_client_mutex}; - - if (snapshot_s3_client && snapshot_s3_client->client && auth_settings == snapshot_s3_client->auth_settings - && snapshot_s3_client->uri.uri == new_uri.uri) - return; + { + std::lock_guard client_lock{snapshot_s3_client_mutex}; + // if client is not changed (same auth settings, same endpoint) we don't need to update + if (snapshot_s3_client && snapshot_s3_client->client && auth_settings == snapshot_s3_client->auth_settings + && snapshot_s3_client->uri.uri == new_uri.uri) + return; + } LOG_INFO(log, "S3 configuration was updated"); - client_lock.unlock(); - auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key); HeaderCollection headers = auth_settings.headers; @@ -108,9 +108,10 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo auto new_client = std::make_shared(std::move(new_uri), std::move(auth_settings), std::move(client)); - client_lock.lock(); - snapshot_s3_client = std::move(new_client); - client_lock.unlock(); + { + std::lock_guard client_lock{snapshot_s3_client_mutex}; + snapshot_s3_client = std::move(new_client); + } LOG_INFO(log, "S3 client was updated"); } catch (...) @@ -189,6 +190,7 @@ void KeeperSnapshotManagerS3::snapshotS3Thread() } // First we need to verify that there isn't already a lock file for the snapshot we want to upload + // Only leader uploads a snapshot, but there can be a rare case where we have 2 leaders in NuRaft if (file_exists(lock_file)) { LOG_ERROR(log, "Lock file for {} already, exists. Probably a different node is already uploading the snapshot", snapshot_name); @@ -228,21 +230,30 @@ void KeeperSnapshotManagerS3::snapshotS3Thread() continue; } + SCOPE_EXIT( + { + LOG_INFO(log, "Removing lock file"); + try + { + Aws::S3::Model::DeleteObjectRequest delete_request; + delete_request.SetBucket(s3_client->uri.bucket); + delete_request.SetKey(lock_file); + auto delete_outcome = s3_client->client->DeleteObject(delete_request); + if (!delete_outcome.IsSuccess()) + throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType()); + } + catch (...) + { + LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path); + tryLogCurrentException(__PRETTY_FUNCTION__); + } + }); + WriteBufferFromS3 snapshot_writer = create_writer(snapshot_name); copyData(snapshot_file, snapshot_writer); snapshot_writer.finalize(); LOG_INFO(log, "Successfully uploaded {} to S3", snapshot_path); - - LOG_INFO(log, "Removing lock file"); - Aws::S3::Model::DeleteObjectRequest delete_request; - delete_request.SetBucket(s3_client->uri.bucket); - delete_request.SetKey(lock_file); - auto delete_outcome = s3_client->client->DeleteObject(delete_request); - - if (!delete_outcome.IsSuccess()) - throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType()); - } catch (...) { @@ -261,8 +272,9 @@ void KeeperSnapshotManagerS3::uploadSnapshot(const std::string & path) LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", path); } -void KeeperSnapshotManagerS3::startup() +void KeeperSnapshotManagerS3::startup(const Poco::Util::AbstractConfiguration & config) { + updateS3Configuration(config); snapshot_s3_thread = ThreadFromGlobalPool([this] { snapshotS3Thread(); }); } diff --git a/src/Coordination/KeeperSnapshotManagerS3.h b/src/Coordination/KeeperSnapshotManagerS3.h index 50464dabc2c..393f42696f9 100644 --- a/src/Coordination/KeeperSnapshotManagerS3.h +++ b/src/Coordination/KeeperSnapshotManagerS3.h @@ -24,7 +24,7 @@ public: void updateS3Configuration(const Poco::Util::AbstractConfiguration & config); void uploadSnapshot(const std::string & path); - void startup(); + void startup(const Poco::Util::AbstractConfiguration & config); void shutdown(); private: using SnapshotS3Queue = ConcurrentBoundedQueue; @@ -54,7 +54,7 @@ public: void updateS3Configuration(const Poco::Util::AbstractConfiguration &) {} void uploadSnapshot(const std::string &) {} - void startup() {} + void startup(const Poco::Util::AbstractConfiguration & config) {} void shutdown() {} }; diff --git a/tests/integration/test_keeper_s3_snapshot/test.py b/tests/integration/test_keeper_s3_snapshot/test.py index 9f335998507..3e19bc4822c 100644 --- a/tests/integration/test_keeper_s3_snapshot/test.py +++ b/tests/integration/test_keeper_s3_snapshot/test.py @@ -77,6 +77,8 @@ def wait_node(node): def test_s3_upload(started_cluster): node1_zk = get_fake_zk(node1.name) + # we defined in configs snapshot_distance as 50 + # so after 50 requests we should generate a snapshot for _ in range(210): node1_zk.create("/test", sequence=True) @@ -99,6 +101,8 @@ def test_s3_upload(started_cluster): destroy_zk_client(node1_zk) node1.stop_clickhouse(kill=True) + # wait for new leader to be picked and that it continues + # uploading snapshots wait_node(node2) node2_zk = get_fake_zk(node2.name) for _ in range(200):