From 7fc60d84cc9e060428900ed06f74b15db7e8a337 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 4 Dec 2024 17:32:06 +0100 Subject: [PATCH] Better deletion of keeper metadata in s3queue --- .../ObjectStorageQueueMetadata.cpp | 159 ++++++++++++++++++ .../ObjectStorageQueueMetadata.h | 3 + .../ObjectStorageQueueMetadataFactory.cpp | 27 +-- .../ObjectStorageQueueMetadataFactory.h | 16 +- .../ObjectStorageQueueSettings.cpp | 4 +- .../StorageObjectStorageQueue.cpp | 9 +- .../integration/test_storage_s3_queue/test.py | 124 ++++++++++++++ 7 files changed, 314 insertions(+), 28 deletions(-) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp index 6aac853b011..6f536d78ac0 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -438,6 +439,164 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper( "of wrong zookeeper path or because of logical error"); } +namespace +{ + struct Info + { + std::string hostname; + std::string table_id; + + bool operator ==(const Info & other) const + { + return hostname == other.hostname && table_id == other.table_id; + } + + static Info create(const StorageID & storage_id) + { + Info self; + self.hostname = DNSResolver::instance().getHostName(); + self.table_id = storage_id.hasUUID() ? toString(storage_id.uuid) : storage_id.getFullTableName(); + return self; + } + + std::string serialize() const + { + WriteBufferFromOwnString buf; + size_t version = 0; + buf << version << "\n"; + buf << hostname << "\n"; + buf << table_id << "\n"; + return buf.str(); + } + + static Info deserialize(const std::string & str) + { + ReadBufferFromString buf(str); + Info info; + size_t version; + buf >> version >> "\n"; + buf >> info.hostname >> "\n"; + buf >> info.table_id >> "\n"; + return info; + } + }; +} + +void ObjectStorageQueueMetadata::registerIfNot(const StorageID & storage_id) +{ + const auto registry_path = zookeeper_path / "registry"; + const auto self = Info::create(storage_id); + + Coordination::Error code; + for (size_t i = 0; i < 1000; ++i) + { + Coordination::Stat stat; + std::string registry_str; + auto zk_client = getZooKeeper(); + + bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat); + if (node_exists) + { + Strings registered; + splitInto<','>(registered, registry_str); + + for (const auto & elem : registered) + { + if (elem.empty()) + continue; + + auto info = Info::deserialize(elem); + if (info == self) + { + LOG_TRACE(log, "Table {} is already registered", self.table_id); + return; + } + } + } + + LOG_TRACE(log, "Adding {} to registry", self.table_id); + + if (node_exists) + { + auto new_registry_str = registry_str + "," + self.serialize(); + code = zk_client->trySet(registry_path, new_registry_str, stat.version); + } + else + { + code = zk_client->tryCreate(registry_path, self.serialize(), zkutil::CreateMode::Persistent); + } + + if (code == Coordination::Error::ZOK) + return; + + if (code == Coordination::Error::ZBADVERSION + || code == Coordination::Error::ZSESSIONEXPIRED) + continue; + + throw zkutil::KeeperException(code); + } + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot register in keeper. Last error: {}", code); +} + +size_t ObjectStorageQueueMetadata::unregister(const StorageID & storage_id) +{ + const auto registry_path = zookeeper_path / "registry"; + const auto self = Info::create(storage_id); + + Coordination::Error code; + for (size_t i = 0; i < 1000; ++i) + { + Coordination::Stat stat; + std::string registry_str; + auto zk_client = getZooKeeper(); + + bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat); + if (!node_exists) + { + LOG_WARNING(log, "Cannot unregister: registry does not exist"); + chassert(false); + return 0; + } + + Strings registered; + splitInto<','>(registered, registry_str); + + bool found = false; + std::string new_registry_str; + size_t count = 0; + for (const auto & elem : registered) + { + if (elem.empty()) + continue; + + auto info = Info::deserialize(elem); + if (info == self) + found = true; + else + { + if (!new_registry_str.empty()) + new_registry_str += ","; + new_registry_str += elem; + count += 1; + } + } + if (!found) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister: not registered"); + + code = zk_client->trySet(registry_path, new_registry_str, stat.version); + + if (code == Coordination::Error::ZOK) + return count; + + if (code == Coordination::Error::ZBADVERSION + || code == Coordination::Error::ZSESSIONEXPIRED) + continue; + + throw zkutil::KeeperException(code); + } + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister in keeper. Last error: {}", code); +} + void ObjectStorageQueueMetadata::cleanupThreadFunc() { /// A background task is responsible for maintaining diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h index 8da8a4c367d..aed610a8230 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h @@ -71,6 +71,9 @@ public: bool is_attach, LoggerPtr log); + void registerIfNot(const StorageID & storage_id); + size_t unregister(const StorageID & storage_id); + void shutdown(); FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {}); diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp index ba98711eff9..f9902e556f0 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp @@ -16,7 +16,8 @@ ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance( ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFactory::getOrCreate( const std::string & zookeeper_path, - ObjectStorageQueueMetadataPtr metadata) + ObjectStorageQueueMetadataPtr metadata, + const StorageID & storage_id) { std::lock_guard lock(mutex); auto it = metadata_by_path.find(zookeeper_path); @@ -27,16 +28,16 @@ ObjectStorageQueueMetadataFactory::FilesMetadataPtr ObjectStorageQueueMetadataFa else { auto & metadata_from_table = metadata->getTableMetadata(); - auto & metadata_from_keeper = it->second.metadata->getTableMetadata(); + auto & metadata_from_keeper = it->second->getTableMetadata(); metadata_from_table.checkEquals(metadata_from_keeper); - - it->second.ref_count += 1; } - return it->second.metadata; + + it->second->registerIfNot(storage_id); + return it->second; } -void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path) +void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path, const StorageID & storage_id) { std::lock_guard lock(mutex); auto it = metadata_by_path.find(zookeeper_path); @@ -44,17 +45,19 @@ void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_pat if (it == metadata_by_path.end()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path); - chassert(it->second.ref_count > 0); - if (--it->second.ref_count == 0) + const size_t registry_size = it->second->unregister(storage_id); + LOG_TRACE(log, "Remaining registry size: {}", registry_size); + + if (registry_size == 0) { try { auto zk_client = Context::getGlobalContextInstance()->getZooKeeper(); - zk_client->tryRemove(it->first); + zk_client->removeRecursive(it->first); } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log); } metadata_by_path.erase(it); @@ -64,8 +67,8 @@ void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_pat std::unordered_map ObjectStorageQueueMetadataFactory::getAll() { std::unordered_map result; - for (const auto & [zk_path, metadata_and_ref_count] : metadata_by_path) - result.emplace(zk_path, metadata_and_ref_count.metadata); + for (const auto & [zk_path, metadata] : metadata_by_path) + result.emplace(zk_path, metadata); return result; } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h index bd2455097ec..0b35d42a9a1 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h @@ -14,25 +14,19 @@ public: FilesMetadataPtr getOrCreate( const std::string & zookeeper_path, - ObjectStorageQueueMetadataPtr metadata); + ObjectStorageQueueMetadataPtr metadata, + const StorageID & storage_id); - void remove(const std::string & zookeeper_path); + void remove(const std::string & zookeeper_path, const StorageID & storage_id); std::unordered_map getAll(); private: - struct Metadata - { - explicit Metadata(std::shared_ptr metadata_) : metadata(metadata_), ref_count(1) {} - - std::shared_ptr metadata; - /// TODO: the ref count should be kept in keeper, because of the case with distributed processing. - size_t ref_count = 0; - }; - using MetadataByPath = std::unordered_map; + using MetadataByPath = std::unordered_map>; MetadataByPath metadata_by_path; std::mutex mutex; + LoggerPtr log = getLogger("QueueMetadataFactory"); }; } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp index 47eb57f918d..060f1cd2dd5 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp @@ -30,8 +30,8 @@ namespace ErrorCodes DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \ DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \ DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \ - DECLARE(UInt64, polling_max_timeout_ms, 10 * 60 * 1000, "Maximum timeout before next polling", 0) \ - DECLARE(UInt64, polling_backoff_ms, 30 * 1000, "Polling backoff", 0) \ + DECLARE(UInt64, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \ + DECLARE(UInt64, polling_backoff_ms, 1000, "Polling backoff", 0) \ DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \ DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \ DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \ diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp index 6630d5a6c8f..eac03304999 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp @@ -214,9 +214,12 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log); auto queue_metadata = std::make_unique( - zk_path, std::move(table_metadata), (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]); + zk_path, + std::move(table_metadata), + (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], + (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]); - files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata)); + files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata), table_id_); task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); }); } @@ -248,7 +251,7 @@ void StorageObjectStorageQueue::shutdown(bool is_drop) void StorageObjectStorageQueue::drop() { - ObjectStorageQueueMetadataFactory::instance().remove(zk_path); + ObjectStorageQueueMetadataFactory::instance().remove(zk_path, getStorageID()); } bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index aecc1df491b..09972872ef3 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -2276,3 +2276,127 @@ def test_alter_settings(started_cluster): check_int_settings(node, int_settings) check_string_settings(node, string_settings) + + +def test_registry(started_cluster): + node1 = started_cluster.instances["node1"] + node2 = started_cluster.instances["node2"] + + table_name = f"test_registry_{uuid.uuid4().hex[:8]}" + db_name = f"db_{table_name}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 1000 + + node1.query(f"DROP DATABASE IF EXISTS {db_name}") + node2.query(f"DROP DATABASE IF EXISTS {db_name}") + + node1.query( + f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node1')" + ) + node2.query( + f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node2')" + ) + + create_table( + started_cluster, + node1, + table_name, + "ordered", + files_path, + additional_settings={"keeper_path": keeper_path, "buckets": 3}, + database_name=db_name, + ) + + zk = started_cluster.get_kazoo_client("zoo1") + registry, stat = zk.get(f"{keeper_path}/registry/") + + uuid1 = node1.query( + f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name}'" + ).strip() + assert uuid1 in str(registry) + + expected = [f"0\\nnode1\\n{uuid1}\\n", f"0\\nnode2\\n{uuid1}\\n"] + + for elem in expected: + assert elem in str(registry) + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + create_mv(node1, f"{db_name}.{table_name}", dst_table_name) + create_mv(node2, f"{db_name}.{table_name}", dst_table_name) + + def get_count(): + return int( + node1.query( + f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})" + ) + ) + + expected_rows = files_to_generate + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + assert expected_rows == get_count() + + table_name_2 = f"test_registry_{uuid.uuid4().hex[:8]}_2" + create_table( + started_cluster, + node1, + table_name_2, + "ordered", + files_path, + additional_settings={"keeper_path": keeper_path, "buckets": 3}, + database_name=db_name, + ) + + registry, stat = zk.get(f"{keeper_path}/registry/") + + uuid2 = node1.query( + f"SELECT uuid FROM system.tables WHERE database = '{db_name}' and table = '{table_name_2}'" + ).strip() + + assert uuid1 in str(registry) + assert uuid2 in str(registry) + + expected = [ + f"0\\nnode1\\n{uuid1}\\n", + f"0\\nnode2\\n{uuid1}\\n", + f"0\\nnode1\\n{uuid2}\\n", + f"0\\nnode2\\n{uuid2}\\n", + ] + + for elem in expected: + assert elem in str(registry) + + node1.restart_clickhouse() + node2.restart_clickhouse() + + registry, stat = zk.get(f"{keeper_path}/registry/") + + assert uuid1 in str(registry) + assert uuid2 in str(registry) + + node1.query(f"DROP TABLE {db_name}.{table_name_2} SYNC") + + assert zk.exists(keeper_path) is not None + registry, stat = zk.get(f"{keeper_path}/registry/") + + assert uuid1 in str(registry) + assert uuid2 not in str(registry) + + expected = [ + f"0\\nnode1\\n{uuid1}\\n", + f"0\\nnode2\\n{uuid1}\\n", + ] + + for elem in expected: + assert elem in str(registry) + + node1.query(f"DROP TABLE {db_name}.{table_name} SYNC") + + assert zk.exists(keeper_path) is None