From 361b2f107b33e89c6b6694e420cb668bc50d160b Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 31 Jan 2024 21:28:18 +0100 Subject: [PATCH 1/5] Allow to define a starting point for s3queue ordered mode --- src/Storages/S3Queue/S3QueueFilesMetadata.cpp | 74 ++++++++++++------- src/Storages/S3Queue/S3QueueFilesMetadata.h | 4 + src/Storages/S3Queue/S3QueueSettings.h | 1 + src/Storages/S3Queue/StorageS3Queue.cpp | 13 +++- .../integration/test_storage_s3_queue/test.py | 50 +++++++++++++ 5 files changed, 113 insertions(+), 29 deletions(-) diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index 61f6b7fe052..ac80ded5792 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -637,25 +637,31 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder "this could be a result of expired zookeeper session", path); } + void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder) +{ + auto processed_node_path = isShardedProcessing() + ? zookeeper_processed_path / toString(getProcessingIdForPath(holder->path)) + : zookeeper_processed_path; + + return setFileProcessedForOrderedModeImpl(holder->path, holder, processed_node_path); +} + +void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( + const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path) { /// Update a persistent node in /processed and remove ephemeral node from /processing. - const auto & path = holder->path; const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); - auto processed_node = isShardedProcessing() - ? zookeeper_processed_path / toString(getProcessingIdForPath(path)) - : zookeeper_processed_path; - - LOG_TEST(log, "Setting file `{}` as processed", path); + LOG_TEST(log, "Setting file `{}` as processed (at {})", path, processed_node_path); while (true) { std::string res; Coordination::Stat stat; - bool exists = zk_client->tryGet(processed_node, res, &stat); + bool exists = zk_client->tryGet(processed_node_path, res, &stat); Coordination::Requests requests; if (exists) { @@ -664,39 +670,41 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt auto metadata = NodeMetadata::fromString(res); if (metadata.file_path >= path) { - /// Here we get in the case that maximum processed file is bigger than ours. - /// This is possible to achieve in case of parallel processing - /// but for local processing we explicitly disable parallel mode and do everything in a single thread - /// (see constructor of StorageS3Queue where s3queue_processing_threads_num is explicitly set to 1 in case of Ordered mode). - /// Nevertheless, in case of distributed processing we cannot do anything with parallelism. - /// What this means? - /// It means that in scenario "distributed processing + Ordered mode" - /// a setting s3queue_loading_retries will not work. It is possible to fix, it is in TODO. - - /// Return because there is nothing to change, - /// the max processed file is already bigger than ours. + LOG_TRACE(log, "File {} is already processed, current max processed file: {}", path, metadata.file_path); return; } } - requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version)); + requests.push_back(zkutil::makeSetRequest(processed_node_path, node_metadata, stat.version)); } else { - requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeCreateRequest(processed_node_path, node_metadata, zkutil::CreateMode::Persistent)); } Coordination::Responses responses; - if (holder->remove(&requests, &responses)) + if (holder) { - LOG_TEST(log, "Moved file `{}` to processed", path); - if (max_loading_retries) - zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); - return; + if (holder->remove(&requests, &responses)) + { + LOG_TEST(log, "Moved file `{}` to processed", path); + if (max_loading_retries) + zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); + return; + } + } + else + { + auto code = zk_client->tryMulti(requests, responses); + if (code == Coordination::Error::ZOK) + return; } /// Failed to update max processed node, retry. if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK) + { + LOG_TRACE(log, "Failed to update processed node ({}). Will retry.", magic_enum::enum_name(responses[0]->error)); continue; + } LOG_WARNING(log, "Cannot set file ({}) as processed since processing node " "does not exist with expected processing id does not exist, " @@ -705,6 +713,22 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt } } +void S3QueueFilesMetadata::setFileProcessed(const std::string & path, size_t shard_id) +{ + if (mode != S3QueueMode::ORDERED) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode"); + + if (isShardedProcessing()) + { + for (const auto & processor : getProcessingIdsForShard(shard_id)) + setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path / toString(processor)); + } + else + { + setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path); + } +} + void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index c83c6f20b92..9301ea7ceb8 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -42,6 +42,7 @@ public: ~S3QueueFilesMetadata(); void setFileProcessed(ProcessingNodeHolderPtr holder); + void setFileProcessed(const std::string & path, size_t shard_id); void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message); @@ -141,6 +142,9 @@ private: void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder); std::string getZooKeeperPathForShard(size_t shard_id) const; + void setFileProcessedForOrderedModeImpl( + const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path); + enum class SetFileProcessingResult { Success, diff --git a/src/Storages/S3Queue/S3QueueSettings.h b/src/Storages/S3Queue/S3QueueSettings.h index d65b38f77f2..5c846b4f294 100644 --- a/src/Storages/S3Queue/S3QueueSettings.h +++ b/src/Storages/S3Queue/S3QueueSettings.h @@ -22,6 +22,7 @@ class ASTStorage; M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \ M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \ M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \ + M(String, s3queue_last_processed_path, "", "For Ordered mode. Files with smaller file name are considered already processed", 0) \ M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \ M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \ M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \ diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 23ef9aec980..5fc0d19ce0e 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -155,10 +155,6 @@ StorageS3Queue::StorageS3Queue( LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); - /// Get metadata manager from S3QueueMetadataFactory, - /// it will increase the ref count for the metadata object. - /// The ref count is decreased when StorageS3Queue::drop() method is called. - files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings); try { createOrCheckMetadata(storage_metadata); @@ -169,6 +165,11 @@ StorageS3Queue::StorageS3Queue( throw; } + /// Get metadata manager from S3QueueMetadataFactory, + /// it will increase the ref count for the metadata object. + /// The ref count is decreased when StorageS3Queue::drop() method is called. + files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings); + if (files_metadata->isShardedProcessing()) { if (!s3queue_settings->s3queue_current_shard_num.changed) @@ -181,6 +182,10 @@ StorageS3Queue::StorageS3Queue( files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num); } } + if (s3queue_settings->mode == S3QueueMode::ORDERED && !s3queue_settings->s3queue_last_processed_path.value.empty()) + { + files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value, s3queue_settings->s3queue_current_shard_num); + } } void StorageS3Queue::startup() diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 5e86b798bf7..8c20eb94fad 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -1283,3 +1283,53 @@ def test_settings_check(started_cluster): ) node.query(f"DROP TABLE {table_name} SYNC") + + +@pytest.mark.parametrize("processing_threads", [1, 5]) +def test_processed_file_setting(started_cluster, processing_threads): + node = started_cluster.instances["instance"] + table_name = f"test_processed_file_setting_{processing_threads}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 10 + + create_table( + started_cluster, + node, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": processing_threads, + "s3queue_last_processed_path": f"{files_path}/test_5.csv", + }, + ) + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + create_mv(node, table_name, dst_table_name) + + def get_count(): + return int(node.query(f"SELECT count() FROM {dst_table_name}")) + + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + + assert expected_rows == get_count() + + node.restart_clickhouse() + time.sleep(10) + + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + + assert expected_rows == get_count() From f7a5f09a0fc76662114a33c60bc3eb0aaebd5f1c Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 31 Jan 2024 22:59:25 +0100 Subject: [PATCH 2/5] Add one more test --- .../integration/test_storage_s3_queue/test.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 8c20eb94fad..34146484ced 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -99,6 +99,7 @@ def started_cluster(): main_configs=[ "configs/s3queue_log.xml", ], + stay_alive=True, ) logging.info("Starting cluster...") @@ -1333,3 +1334,58 @@ def test_processed_file_setting(started_cluster, processing_threads): time.sleep(1) assert expected_rows == get_count() + + +@pytest.mark.parametrize("processing_threads", [1, 5]) +def test_processed_file_setting_distributed(started_cluster, processing_threads): + node = started_cluster.instances["instance"] + node_2 = started_cluster.instances["instance2"] + table_name = f"test_processed_file_setting_distributed_{processing_threads}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 10 + + for instance in [node, node_2]: + create_table( + started_cluster, + instance, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": processing_threads, + "s3queue_last_processed_path": f"{files_path}/test_5.csv", + "s3queue_total_shards_num": 2, + }, + ) + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + for instance in [node, node_2]: + create_mv(instance, table_name, dst_table_name) + + def get_count(): + query = f"SELECT count() FROM {dst_table_name}" + return int(node.query(query)) + int(node_2.query(query)) + + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + assert expected_rows == get_count() + + for instance in [node, node_2]: + instance.restart_clickhouse() + + time.sleep(10) + expected_rows = 4 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + assert expected_rows == get_count() From b3418c506e8a8c3223d66e1f7ba04bbd980b6401 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Thu, 1 Feb 2024 10:28:33 +0100 Subject: [PATCH 3/5] Update src/Storages/S3Queue/S3QueueSettings.h Co-authored-by: Antonio Andelic --- src/Storages/S3Queue/S3QueueSettings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/S3Queue/S3QueueSettings.h b/src/Storages/S3Queue/S3QueueSettings.h index 5c846b4f294..c26e973a1c0 100644 --- a/src/Storages/S3Queue/S3QueueSettings.h +++ b/src/Storages/S3Queue/S3QueueSettings.h @@ -22,7 +22,7 @@ class ASTStorage; M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \ M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \ M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \ - M(String, s3queue_last_processed_path, "", "For Ordered mode. Files with smaller file name are considered already processed", 0) \ + M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \ M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \ M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \ M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \ From 3a16427e002711005bcfa5b75bf7a1d301348e9f Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 1 Feb 2024 12:35:01 +0100 Subject: [PATCH 4/5] Fix test --- src/Storages/S3Queue/StorageS3Queue.cpp | 1 - tests/integration/test_storage_s3_queue/test.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 5fc0d19ce0e..0723205b544 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -161,7 +161,6 @@ StorageS3Queue::StorageS3Queue( } catch (...) { - S3QueueMetadataFactory::instance().remove(zk_path); throw; } diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 34146484ced..2cb617b3872 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -541,7 +541,7 @@ def test_multiple_tables_meta_mismatch(started_cluster): ) except QueryRuntimeException as e: assert ( - "Metadata with the same `s3queue_zookeeper_path` was already created but with different settings" + "Existing table metadata in ZooKeeper differs in engine mode" in str(e) ) failed = True From fe829cac489ce09f80f8db7d9dfc6de27b9ae685 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Thu, 1 Feb 2024 11:43:55 +0000 Subject: [PATCH 5/5] Automatic style fix --- tests/integration/test_storage_s3_queue/test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 2cb617b3872..810c4f29e9d 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -540,10 +540,7 @@ def test_multiple_tables_meta_mismatch(started_cluster): }, ) except QueryRuntimeException as e: - assert ( - "Existing table metadata in ZooKeeper differs in engine mode" - in str(e) - ) + assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e) failed = True assert failed is True