diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp index 8dbf51a9cf9..979eee62719 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -218,6 +219,11 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper( { ObjectStorageQueueTableMetadata table_metadata(settings, columns, format); + if (!settings.processing_threads_num.changed && settings.processing_threads_num <= 1) + { + table_metadata.processing_threads_num = std::max(getNumberOfPhysicalCPUCores(), 16); + } + std::vector metadata_paths; size_t buckets_num = 0; if (settings.mode == ObjectStorageQueueMode::ORDERED) @@ -250,6 +256,7 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper( LOG_TRACE(log, "Metadata in keeper: {}", metadata_str); + table_metadata.adjustFromKeeper(metadata_from_zk); table_metadata.checkEquals(metadata_from_zk); return table_metadata; } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp index 25d44ed9ddb..178cd049496 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.cpp @@ -116,6 +116,18 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueTableMetadata::parse(const Str return ObjectStorageQueueTableMetadata(json); } +void ObjectStorageQueueTableMetadata::adjustFromKeeper(const ObjectStorageQueueTableMetadata & from_zk) +{ + if (processing_threads_num != from_zk.processing_threads_num) + { + LOG_TRACE(getLogger("ObjectStorageQueueTableMetadata"), + "Using `processing_threads_num` from keeper: {} (local: {})", + from_zk.processing_threads_num, processing_threads_num); + + processing_threads_num = from_zk.processing_threads_num; + } +} + void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const { checkImmutableFieldsEquals(from_zk); diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h index f1cf6428160..51547925f95 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h @@ -25,7 +25,7 @@ struct ObjectStorageQueueTableMetadata const UInt64 tracked_files_limit; const UInt64 tracked_file_ttl_sec; const UInt64 buckets; - const UInt64 processing_threads_num; + UInt64 processing_threads_num; /// Can be changed from keeper. const String last_processed_path; const UInt64 loading_retries; @@ -42,6 +42,8 @@ struct ObjectStorageQueueTableMetadata ObjectStorageQueueMode getMode() const; + void adjustFromKeeper(const ObjectStorageQueueTableMetadata & from_zk); + void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const; private: diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp index b0393a33523..4e170b466b3 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include diff --git a/tests/integration/test_storage_s3_queue/configs/remote_servers.xml b/tests/integration/test_storage_s3_queue/configs/remote_servers.xml new file mode 100644 index 00000000000..84d16206080 --- /dev/null +++ b/tests/integration/test_storage_s3_queue/configs/remote_servers.xml @@ -0,0 +1,16 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index b75ad21f002..96dd6d5a18e 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -124,6 +124,26 @@ def started_cluster(): with_installed_binary=True, use_old_analyzer=True, ) + cluster.add_instance( + "node1", + with_zookeeper=True, + stay_alive=True, + main_configs=[ + "configs/zookeeper.xml", + "configs/s3queue_log.xml", + "configs/remote_servers.xml", + ], + ) + cluster.add_instance( + "node2", + with_zookeeper=True, + stay_alive=True, + main_configs=[ + "configs/zookeeper.xml", + "configs/s3queue_log.xml", + "configs/remote_servers.xml", + ], + ) cluster.add_instance( "instance_too_many_parts", user_configs=["configs/users.xml"], @@ -215,6 +235,7 @@ def create_table( auth=DEFAULT_AUTH, bucket=None, expect_error=False, + database_name="default", ): auth_params = ",".join(auth) bucket = started_cluster.minio_bucket if bucket is None else bucket @@ -236,7 +257,7 @@ def create_table( node.query(f"DROP TABLE IF EXISTS {table_name}") create_query = f""" - CREATE TABLE {table_name} ({format}) + CREATE TABLE {database_name}.{table_name} ({format}) ENGINE = {engine_def} SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """ @@ -514,7 +535,7 @@ def test_direct_select_multiple_files(started_cluster, mode): table_name, mode, files_path, - additional_settings={"keeper_path": keeper_path}, + additional_settings={"keeper_path": keeper_path, "processing_threads_num": 3}, ) for i in range(5): rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)] @@ -1867,3 +1888,58 @@ def test_commit_on_limit(started_cluster): for value in expected_failed: assert value not in processed assert value in failed + + +def test_replicated(started_cluster): + node1 = started_cluster.instances["node1"] + node2 = started_cluster.instances["node2"] + + table_name = f"test_replicated" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" + files_path = f"{table_name}_data" + files_to_generate = 1000 + + node1.query( + "CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node1')" + ) + node2.query( + "CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node2')" + ) + + create_table( + started_cluster, + node1, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + }, + database_name="r", + ) + + assert '"processing_threads_num":16' in node1.query( + f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'" + ) + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + create_mv(node1, f"r.{table_name}", dst_table_name) + create_mv(node2, f"r.{table_name}", dst_table_name) + + def get_count(): + return int( + node1.query( + f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})" + ) + ) + + expected_rows = files_to_generate + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + assert expected_rows == get_count()