From 373927d6a5a6b89a1ccb7b1207942dc4a5e8a39e Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 18 Sep 2024 19:25:18 +0200 Subject: [PATCH] Fix tracked_files_ttl_sec --- .../ObjectStorageQueueMetadata.cpp | 12 +++++++----- .../ObjectStorageQueueTableMetadata.cpp | 16 ++++++++-------- .../ObjectStorageQueueTableMetadata.h | 2 +- tests/integration/test_storage_s3_queue/test.py | 8 ++++++++ 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp index 8dbf51a9cf9..5814b60579a 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp @@ -122,7 +122,7 @@ ObjectStorageQueueMetadata::ObjectStorageQueueMetadata( , local_file_statuses(std::make_shared()) { if (mode == ObjectStorageQueueMode::UNORDERED - && (table_metadata.tracked_files_limit || table_metadata.tracked_file_ttl_sec)) + && (table_metadata.tracked_files_limit || table_metadata.tracked_files_ttl_sec)) { task = Context::getGlobalContextInstance()->getSchedulePool().createTask( "ObjectStorageQueueCleanupFunc", @@ -366,9 +366,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() return; } - chassert(table_metadata.tracked_files_limit || table_metadata.tracked_file_ttl_sec); + chassert(table_metadata.tracked_files_limit || table_metadata.tracked_files_ttl_sec); const bool check_nodes_limit = table_metadata.tracked_files_limit > 0; - const bool check_nodes_ttl = table_metadata.tracked_file_ttl_sec > 0; + const bool check_nodes_ttl = table_metadata.tracked_files_ttl_sec > 0; const bool nodes_limit_exceeded = nodes_num > table_metadata.tracked_files_limit; if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl) @@ -443,7 +443,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() wb << fmt::format("Node: {}, path: {}, timestamp: {};\n", node, metadata.file_path, metadata.last_processed_timestamp); return wb.str(); }; - LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", table_metadata.tracked_files_limit, table_metadata.tracked_file_ttl_sec, get_nodes_str()); + + LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", + table_metadata.tracked_files_limit, table_metadata.tracked_files_ttl_sec, get_nodes_str()); size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - table_metadata.tracked_files_limit : 0; for (const auto & node : sorted_nodes) @@ -464,7 +466,7 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() else if (check_nodes_ttl) { UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp; - if (node_age >= table_metadata.tracked_file_ttl_sec) + if (node_age >= table_metadata.tracked_files_ttl_sec) { LOG_TRACE(log, "Removing node at path {} ({}) because file ttl is reached", node.metadata.file_path, node.zk_path); diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp index 25d44ed9ddb..f329fdced02 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp @@ -45,7 +45,7 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata( , after_processing(engine_settings.after_processing.toString()) , mode(engine_settings.mode.toString()) , tracked_files_limit(engine_settings.tracked_files_limit) - , tracked_file_ttl_sec(engine_settings.tracked_file_ttl_sec) + , tracked_files_ttl_sec(engine_settings.tracked_file_ttl_sec) , buckets(engine_settings.buckets) , processing_threads_num(engine_settings.processing_threads_num) , last_processed_path(engine_settings.last_processed_path) @@ -59,7 +59,7 @@ String ObjectStorageQueueTableMetadata::toString() const json.set("after_processing", after_processing); json.set("mode", mode); json.set("tracked_files_limit", tracked_files_limit); - json.set("tracked_file_ttl_sec", tracked_file_ttl_sec); + json.set("tracked_files_ttl_sec", tracked_files_ttl_sec); json.set("processing_threads_num", processing_threads_num); json.set("buckets", buckets); json.set("format_name", format_name); @@ -100,7 +100,7 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(const Poco::JSO , after_processing(json->getValue("after_processing")) , mode(json->getValue("mode")) , tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0)) - , tracked_file_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "s3queue_", 0)) + , tracked_files_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "", getOrDefault(json, "tracked_file_ttl_sec", "s3queue_", 0))) , buckets(getOrDefault(json, "buckets", "", 0)) , processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 1)) , last_processed_path(getOrDefault(json, "last_processed_file", "s3queue_", "")) @@ -142,18 +142,18 @@ void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectSto if (tracked_files_limit != from_zk.tracked_files_limit) throw Exception( ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in max set size. " + "Existing table metadata in ZooKeeper differs in `tracked_files_limit`. " "Stored in ZooKeeper: {}, local: {}", from_zk.tracked_files_limit, tracked_files_limit); - if (tracked_file_ttl_sec != from_zk.tracked_file_ttl_sec) + if (tracked_files_ttl_sec != from_zk.tracked_files_ttl_sec) throw Exception( ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in max set age. " + "Existing table metadata in ZooKeeper differs in `tracked_files_ttl_sec`. " "Stored in ZooKeeper: {}, local: {}", - from_zk.tracked_file_ttl_sec, - tracked_file_ttl_sec); + from_zk.tracked_files_ttl_sec, + tracked_files_ttl_sec); if (format_name != from_zk.format_name) throw Exception( diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h index f1cf6428160..6aa1897c0f0 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h @@ -23,7 +23,7 @@ struct ObjectStorageQueueTableMetadata const String after_processing; const String mode; const UInt64 tracked_files_limit; - const UInt64 tracked_file_ttl_sec; + const UInt64 tracked_files_ttl_sec; const UInt64 buckets; const UInt64 processing_threads_num; const String last_processed_path; diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index b75ad21f002..2d1152c7f8d 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -976,6 +976,14 @@ def test_max_set_age(started_cluster): ) ) + node.restart_clickhouse() + + expected_rows *= 2 + wait_for_condition(lambda: get_count() == expected_rows) + assert files_to_generate == int( + node.query(f"SELECT uniq(_path) from {dst_table_name}") + ) + def test_max_set_size(started_cluster): node = started_cluster.instances["instance"]