From 342755d35e9ecef55df306007f6f5b95b8d2b8db Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 14 Sep 2023 18:41:31 +0200 Subject: [PATCH] Better --- src/Storages/S3Queue/S3QueueFilesMetadata.cpp | 173 +++++++- src/Storages/S3Queue/S3QueueFilesMetadata.h | 13 +- src/Storages/S3Queue/S3QueueSource.cpp | 3 + src/Storages/S3Queue/StorageS3Queue.cpp | 13 +- src/Storages/S3Queue/StorageS3Queue.h | 2 - .../integration/test_storage_s3_queue/test.py | 371 ++++++++---------- 6 files changed, 357 insertions(+), 218 deletions(-) diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index 99c0924968c..d4c2c116a47 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -1,8 +1,13 @@ +#include "Common/Exception.h" +#include "Common/ZooKeeper/Types.h" +#include "Interpreters/Context_fwd.h" +#include "Storages/S3Queue/S3QueueSettings.h" #include "config.h" #if USE_AWS_S3 #include #include +#include #include #include #include @@ -28,11 +33,22 @@ namespace { return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); } + + size_t generateRescheduleInterval() + { + /// Use more or less random interval for unordered mode cleanup task. + /// So that distributed processing cleanup tasks would not schedule cleanup at the same time. + /// TODO: make lower and upper boundary configurable by settings + pcg64 rng(randomSeed()); + //return 5000 + rng() % 30000; + return rng() % 100; + } } S3QueueFilesMetadata::S3QueueFilesMetadata( const StorageS3Queue * storage_, - const S3QueueSettings & settings_) + const S3QueueSettings & settings_, + ContextPtr context) : storage(storage_) , mode(settings_.mode) , max_set_size(settings_.s3queue_tracked_files_limit.value) @@ -43,6 +59,27 @@ S3QueueFilesMetadata::S3QueueFilesMetadata( , zookeeper_failed_path(storage->getZooKeeperPath() / "failed") , log(&Poco::Logger::get("S3QueueFilesMetadata")) { + if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec)) + { + task = context->getSchedulePool().createTask("S3QueueCleanupFunc", [this] { cleanupThreadFunc(); }); + task->activate(); + + auto schedule_ms = generateRescheduleInterval(); + LOG_TEST(log, "Scheduling a cleanup task in {} ms", schedule_ms); + task->scheduleAfter(schedule_ms); + } +} + +S3QueueFilesMetadata::~S3QueueFilesMetadata() +{ + deactivateCleanupTask(); +} + +void S3QueueFilesMetadata::deactivateCleanupTask() +{ + shutdown = true; + if (task) + task->deactivate(); } std::string S3QueueFilesMetadata::NodeMetadata::toString() const @@ -109,6 +146,10 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) { + /// Create an ephemenral node in /processing + /// if corresponding node does not exist in failed/, processed/ and processing/. + /// Return false otherwise. + const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = storage->getZooKeeper(); @@ -125,6 +166,10 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::str bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) { + /// Create an ephemenral node in /processing + /// if corresponding it does not exist in failed/, processing/ and satisfied max processed file check. + /// Return false otherwise. + const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = storage->getZooKeeper(); @@ -195,8 +240,7 @@ void S3QueueFilesMetadata::setFileProcessed(const String & path) void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(const String & path) { - /// List results in s3 are always returned in UTF-8 binary order. - /// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html) + /// Create a persistent node in /processed and remove ephemeral node from /processing. const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); @@ -337,6 +381,129 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc } } +void S3QueueFilesMetadata::cleanupThreadFunc() +{ + /// A background task is responsible for maintaining + /// max_set_size and max_set_age settings for `unordered` processing mode. + + if (shutdown) + return; + + try + { + cleanupThreadFuncImpl(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + if (shutdown) + return; + + task->scheduleAfter(generateRescheduleInterval()); +} + +void S3QueueFilesMetadata::cleanupThreadFuncImpl() +{ + chassert(max_set_size || max_set_age_sec); + + const bool check_nodes_limit = max_set_size > 0; + const bool check_nodes_ttl = max_set_age_sec > 0; + + const auto zk_client = storage->getZooKeeper(); + auto nodes = zk_client->getChildren(zookeeper_processed_path); + if (nodes.empty()) + { + LOG_TEST(log, "A set of nodes is empty"); + return; + } + + const bool nodes_limit_exceeded = nodes.size() > max_set_size; + if (!nodes_limit_exceeded && check_nodes_limit && !check_nodes_ttl) + { + LOG_TEST(log, "No limit exceeded"); + return; + } + + struct Node + { + std::string name; + NodeMetadata metadata; + }; + auto node_cmp = [](const Node & a, const Node & b) + { + return a.metadata.last_processed_timestamp < b.metadata.last_processed_timestamp; + }; + + /// Ordered in ascending order of timestamps. + std::set sorted_nodes(node_cmp); + + for (const auto & node : nodes) + { + try + { + std::string metadata_str; + if (zk_client->tryGet(zookeeper_processed_path / node, metadata_str)) + { + bool inserted = sorted_nodes.emplace(node, NodeMetadata::fromString(metadata_str)).second; + chassert(inserted); + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + + /// TODO add a zookeeper lock for cleanup + + LOG_TRACE(log, "Checking node limits"); + + size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0; + for (const auto & node : sorted_nodes) + { + if (nodes_to_remove) + { + auto path = zookeeper_processed_path / node.name; + LOG_TEST(log, "Removing node at path `{}` because max files limit is reached", path.string()); + + auto code = zk_client->tryRemove(path); + if (code == Coordination::Error::ZOK) + --nodes_to_remove; + else + LOG_ERROR(log, "Failed to remove a node `{}`", path.string()); + } + else if (check_nodes_ttl) + { + UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp; + if (node_age >= max_set_age_sec) + { + auto path = zookeeper_processed_path / node.name; + LOG_TEST(log, "Removing node at path `{}` because file ttl is reached", path.string()); + + auto code = zk_client->tryRemove(path); + if (code != Coordination::Error::ZOK) + LOG_ERROR(log, "Failed to remove a node `{}`", path.string()); + } + else if (!nodes_to_remove) + { + /// Nodes limit satisfied. + /// Nodes ttl satisfied as well as if current node is under tll, then all remaining as well + /// (because we are iterating in timestamp ascending order). + break; + } + } + else + { + /// Nodes limit and ttl are satisfied. + break; + } + } + + LOG_TRACE(log, "Node limits check finished"); +} + } #endif diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index 302feab6028..b8e172bcd88 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace fs = std::filesystem; namespace Poco { class Logger; } @@ -17,7 +18,9 @@ class StorageS3Queue; class S3QueueFilesMetadata { public: - S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_); + S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_, ContextPtr context); + + ~S3QueueFilesMetadata(); bool trySetFileAsProcessing(const std::string & path); @@ -25,6 +28,8 @@ public: void setFileFailed(const std::string & path, const std::string & exception_message); + void deactivateCleanupTask(); + private: const StorageS3Queue * storage; const S3QueueMode mode; @@ -39,6 +44,9 @@ private: mutable std::mutex mutex; Poco::Logger * log; + std::atomic_bool shutdown = false; + BackgroundSchedulePool::TaskHolder task; + bool trySetFileAsProcessingForOrderedMode(const std::string & path); bool trySetFileAsProcessingForUnorderedMode(const std::string & path); @@ -59,6 +67,9 @@ private: }; NodeMetadata createNodeMetadata(const std::string & path, const std::string & exception = "", size_t retries = 0); + + void cleanupThreadFunc(); + void cleanupThreadFuncImpl(); }; } diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 6704345ea59..3a834dae8d0 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -37,6 +37,9 @@ StorageS3QueueSource::FileIterator::FileIterator( StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::FileIterator::next() { + /// List results in s3 are always returned in UTF-8 binary order. + /// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html) + while (true) { KeyWithInfo val = glob_iterator->next(); diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 53a79aa9cff..bbaf5ae5311 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -88,7 +88,7 @@ StorageS3Queue::StorageS3Queue( , s3queue_settings(std::move(s3queue_settings_)) , zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings)) , after_processing(s3queue_settings->after_processing) - , files_metadata(std::make_shared(this, *s3queue_settings)) + , files_metadata(std::make_shared(this, *s3queue_settings, context_)) , configuration{configuration_} , format_settings(format_settings_) , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms) @@ -138,8 +138,17 @@ void StorageS3Queue::startup() void StorageS3Queue::shutdown() { shutdown_called = true; + if (task) + { task->deactivate(); + } + + if (files_metadata) + { + files_metadata->deactivateCleanupTask(); + files_metadata.reset(); + } } bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const @@ -182,7 +191,7 @@ std::shared_ptr StorageS3Queue::createSource( { auto configuration_snapshot = updateConfigurationAndGetCopy(local_context); auto file_iterator = createFileIterator(local_context, query); - auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals()); + auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); auto internal_source = std::make_unique( read_from_format_info, configuration.format, getName(), local_context, format_settings, diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 5cdac607645..07f52c434de 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -1,9 +1,7 @@ #pragma once - #include "config.h" #if USE_AWS_S3 - #include #include #include diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 9793f2b7191..dd59138d935 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -191,6 +191,7 @@ def create_table( files_path, format="column1 UInt32, column2 UInt32, column3 UInt32", additional_settings={}, + file_format="CSV", ): settings = { "s3queue_loading_retries": 0, @@ -204,7 +205,7 @@ def create_table( node.query(f"DROP TABLE IF EXISTS {table_name}") create_query = f""" CREATE TABLE {table_name} ({format}) - ENGINE = S3Queue('{url}', {AUTH}'CSV') + ENGINE = S3Queue('{url}', {AUTH}'{file_format}') SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """ node.query(create_query) @@ -527,50 +528,52 @@ def test_streaming_to_many_views(started_cluster, mode): def test_multiple_tables_meta_mismatch(started_cluster): - files_path = f"test_meta" - bucket = started_cluster.minio_restricted_bucket - instance = started_cluster.instances["instance"] - table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + node = started_cluster.instances["instance"] + table_name = f"multiple_tables_meta_mismatch" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" - instance.query( - f""" - DROP TABLE IF EXISTS test.s3_queue; - - CREATE TABLE test.s3_queue ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = 'ordered', - keeper_path = '/clickhouse/test_meta'; - """ + create_table( + started_cluster, + node, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + }, ) # check mode failed = False try: - instance.query( - f""" - CREATE TABLE test.s3_queue_copy ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = 'unordered', - keeper_path = '/clickhouse/test_meta'; - """ + create_table( + started_cluster, + node, + f"{table_name}_copy", + "unordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + }, ) except QueryRuntimeException as e: assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e) failed = True + assert failed is True # check columns - table_format_copy = table_format + ", column4 UInt32" try: - instance.query( - f""" - CREATE TABLE test.s3_queue_copy ({table_format_copy}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = 'ordered', - keeper_path = '/clickhouse/test_meta'; - """ + create_table( + started_cluster, + node, + f"{table_name}_copy", + "ordered", + files_path, + format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32", + additional_settings={ + "keeper_path": keeper_path, + }, ) except QueryRuntimeException as e: assert ( @@ -583,172 +586,96 @@ def test_multiple_tables_meta_mismatch(started_cluster): # check format try: - instance.query( - f""" - CREATE TABLE test.s3_queue_copy ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'TSV') - SETTINGS - mode = 'ordered', - keeper_path = '/clickhouse/test_meta'; - """ + create_table( + started_cluster, + node, + f"{table_name}_copy", + "ordered", + files_path, + format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32", + additional_settings={ + "keeper_path": keeper_path, + }, + file_format="TSV", ) except QueryRuntimeException as e: assert "Existing table metadata in ZooKeeper differs in format name" in str(e) failed = True + assert failed is True # create working engine - instance.query( - f""" - CREATE TABLE test.s3_queue_copy ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = 'ordered', - keeper_path = '/clickhouse/test_meta'; - """ + create_table( + started_cluster, + node, + f"{table_name}_copy", + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + }, ) -def test_max_set_age(started_cluster): - files_to_generate = 10 - max_age = 1 - files_path = f"test_multiple" - bucket = started_cluster.minio_restricted_bucket - instance = started_cluster.instances["instance"] - table_format = "column1 UInt32, column2 UInt32, column3 UInt32" - - instance.query( - f""" - DROP TABLE IF EXISTS test.s3_queue; - - CREATE TABLE test.s3_queue ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = 'unordered', - keeper_path = '/clickhouse/test_set_age', - s3queue_tracked_files_limit = 10, - s3queue_tracked_file_ttl_sec = {max_age}; - """ - ) - - total_values = generate_random_files( - files_to_generate, files_path, started_cluster, bucket, row_num=1 - ) - get_query = f"SELECT * FROM test.s3_queue" - res1 = [ - list(map(int, l.split())) for l in run_query(instance, get_query).splitlines() - ] - assert res1 == total_values - time.sleep(max_age + 1) - - get_query = f"SELECT * FROM test.s3_queue" - res1 = [ - list(map(int, l.split())) for l in run_query(instance, get_query).splitlines() - ] - assert res1 == total_values - - @pytest.mark.parametrize("mode", AVAILABLE_MODES) def test_multiple_tables_streaming_sync(started_cluster, mode): + node = started_cluster.instances["instance"] + table_name = f"multiple_tables_streaming_sync_{mode}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" files_to_generate = 300 poll_size = 30 - files_path = f"test_multiple_{mode}" - bucket = started_cluster.minio_restricted_bucket - instance = started_cluster.instances["instance"] - table_format = "column1 UInt32, column2 UInt32, column3 UInt32" - instance.query( - f""" - DROP TABLE IF EXISTS test.s3_queue; - DROP TABLE IF EXISTS test.s3_queue_copy; - DROP TABLE IF EXISTS test.s3_queue_copy_2; + for i in range(3): + table = f"{table_name}_{i + 1}" + dst_table = f"{dst_table_name}_{i + 1}" + create_table( + started_cluster, + node, + table, + mode, + files_path, + additional_settings={ + "s3queue_polling_size": poll_size, + "keeper_path": keeper_path, + }, + ) + create_mv(node, table, dst_table) - DROP TABLE IF EXISTS test.s3_queue_persistent; - DROP TABLE IF EXISTS test.s3_queue_persistent_copy; - DROP TABLE IF EXISTS test.s3_queue_persistent_copy_2; - - DROP TABLE IF EXISTS test.persistent_s3_queue_mv; - DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy; - DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy_2; - - CREATE TABLE test.s3_queue ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = '{mode}', - keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}', - s3queue_polling_size = {poll_size}; - - CREATE TABLE test.s3_queue_copy ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = '{mode}', - keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}', - s3queue_polling_size = {poll_size}; - - CREATE TABLE test.s3_queue_copy_2 ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = '{mode}', - keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}', - s3queue_polling_size = {poll_size}; - - CREATE TABLE test.s3_queue_persistent ({table_format}) - ENGINE = MergeTree() - ORDER BY column1; - - CREATE TABLE test.s3_queue_persistent_copy ({table_format}) - ENGINE = MergeTree() - ORDER BY column1; - - CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format}) - ENGINE = MergeTree() - ORDER BY column1; - - CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS - SELECT - * - FROM test.s3_queue; - - CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS - SELECT - * - FROM test.s3_queue_copy; - - CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy_2 TO test.s3_queue_persistent_copy_2 AS - SELECT - * - FROM test.s3_queue_copy_2; - """ - ) total_values = generate_random_files( - files_to_generate, files_path, started_cluster, bucket, row_num=1 + started_cluster, files_path, files_to_generate, row_num=1 ) def get_count(table_name): - return int(run_query(instance, f"SELECT count() FROM {table_name}")) + return int(run_query(node, f"SELECT count() FROM {table_name}")) for _ in range(100): if ( - get_count("test.s3_queue_persistent") - + get_count("test.s3_queue_persistent_copy") - + get_count("test.s3_queue_persistent_copy_2") + get_count(f"{dst_table_name}_1") + + get_count(f"{dst_table_name}_2") + + get_count(f"{dst_table_name}_3") ) == files_to_generate: break time.sleep(1) - get_query = f"SELECT * FROM test.s3_queue_persistent" res1 = [ - list(map(int, l.split())) for l in run_query(instance, get_query).splitlines() + list(map(int, l.split())) + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}_1" + ).splitlines() ] - get_query_copy = f"SELECT * FROM test.s3_queue_persistent_copy" res2 = [ list(map(int, l.split())) - for l in run_query(instance, get_query_copy).splitlines() + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}_2" + ).splitlines() ] - get_query_copy_2 = f"SELECT * FROM test.s3_queue_persistent_copy_2" res3 = [ list(map(int, l.split())) - for l in run_query(instance, get_query_copy_2).splitlines() + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}_3" + ).splitlines() ] assert {tuple(v) for v in res1 + res2 + res3} == set( [tuple(i) for i in total_values] @@ -757,54 +684,41 @@ def test_multiple_tables_streaming_sync(started_cluster, mode): # Checking that all files were processed only once time.sleep(10) assert ( - get_count("test.s3_queue_persistent") - + get_count("test.s3_queue_persistent_copy") - + get_count("test.s3_queue_persistent_copy_2") + get_count(f"{dst_table_name}_1") + + get_count(f"{dst_table_name}_2") + + get_count(f"{dst_table_name}_3") ) == files_to_generate @pytest.mark.parametrize("mode", AVAILABLE_MODES) def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): - files_to_generate = 100 + node = started_cluster.instances["instance"] + node_2 = started_cluster.instances["instance2"] + table_name = f"multiple_tables_streaming_sync_distributed_{mode}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 300 poll_size = 2 - files_path = f"test_multiple_{mode}" - bucket = started_cluster.minio_restricted_bucket - instance = started_cluster.instances["instance"] - instance_2 = started_cluster.instances["instance2"] - table_format = "column1 UInt32, column2 UInt32, column3 UInt32" - for inst in [instance, instance_2]: - inst.query( - f""" - DROP TABLE IF EXISTS test.s3_queue; - DROP TABLE IF EXISTS test.s3_queue_persistent; - DROP TABLE IF EXISTS test.persistent_s3_queue_mv; - - CREATE TABLE test.s3_queue ({table_format}) - ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV') - SETTINGS - mode = '{mode}', - keeper_path = '/clickhouse/test_multiple_consumers_{mode}', - s3queue_polling_size = {poll_size}; - - CREATE TABLE test.s3_queue_persistent ({table_format}) - ENGINE = MergeTree() - ORDER BY column1; - """ + for instance in [node, node_2]: + create_table( + started_cluster, + instance, + table_name, + mode, + files_path, + additional_settings={ + "s3queue_polling_size": poll_size, + "keeper_path": keeper_path, + }, ) - for inst in [instance, instance_2]: - inst.query( - f""" - CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS - SELECT - * - FROM test.s3_queue; - """ - ) + for instance in [node, node_2]: + create_mv(instance, table_name, dst_table_name) total_values = generate_random_files( - files_to_generate, files_path, started_cluster, bucket, row_num=1 + started_cluster, files_path, files_to_generate, row_num=1 ) def get_count(node, table_name): @@ -812,18 +726,15 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): for _ in range(150): if ( - get_count(instance, "test.s3_queue_persistent") - + get_count(instance_2, "test.s3_queue_persistent") + get_count(node, dst_table_name) + get_count(node_2, dst_table_name) ) == files_to_generate: break time.sleep(1) - get_query = f"SELECT * FROM test.s3_queue_persistent" - res1 = [ - list(map(int, l.split())) for l in run_query(instance, get_query).splitlines() - ] + get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}" + res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()] res2 = [ - list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines() + list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines() ] assert len(res1) + len(res2) == files_to_generate @@ -837,11 +748,51 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode): # Checking that all files were processed only once time.sleep(10) assert ( - get_count(instance, "test.s3_queue_persistent") - + get_count(instance_2, "test.s3_queue_persistent") + get_count(node, dst_table_name) + get_count(node_2, dst_table_name) ) == files_to_generate +def test_max_set_age(started_cluster): + node = started_cluster.instances["instance"] + table_name = f"max_set_age" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + max_age = 1 + files_to_generate = 10 + + create_table( + started_cluster, + node, + table_name, + "unordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_tracked_files_limit": 10, + "s3queue_tracked_file_ttl_sec": max_age, + }, + ) + + node.wait_for_log_line("Checking node limits") + node.wait_for_log_line("Node limits check finished") + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, row_num=1 + ) + res1 = [ + list(map(int, l.split())) + for l in run_query(node, f"SELECT * FROM {table_name}").splitlines() + ] + assert res1 == total_values + time.sleep(max_age + 1) + + res1 = [ + list(map(int, l.split())) + for l in run_query(node, f"SELECT * FROM {table_name}").splitlines() + ] + assert res1 == total_values + + def test_max_set_size(started_cluster): files_to_generate = 10 files_path = f"test_multiple"