From f9f85a0e8b639c1b953ebdb9ef239ae5799eb350 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 24 Aug 2022 15:17:15 +0300 Subject: [PATCH] Revert "Parallel distributed insert select from *Cluster table functions (#39107)" This reverts commit d3cc2349868de3add3885e8973e314446f4d219e. --- src/Interpreters/InterpreterInsertQuery.cpp | 2 +- src/Storages/HDFS/StorageHDFSCluster.cpp | 33 +-- src/Storages/HDFS/StorageHDFSCluster.h | 12 +- src/Storages/IStorageCluster.h | 28 --- src/Storages/StorageDistributed.cpp | 175 ++++------------ src/Storages/StorageDistributed.h | 4 - src/Storages/StorageReplicatedMergeTree.cpp | 105 ---------- src/Storages/StorageReplicatedMergeTree.h | 5 - src/Storages/StorageS3Cluster.cpp | 32 +-- src/Storages/StorageS3Cluster.h | 12 +- .../test_s3_cluster/configs/cluster.xml | 17 +- tests/integration/test_s3_cluster/test.py | 189 +++--------------- 12 files changed, 88 insertions(+), 526 deletions(-) delete mode 100644 src/Storages/IStorageCluster.h diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 4c7823ddc4e..7b6066575ae 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -326,7 +326,7 @@ BlockIO InterpreterInsertQuery::execute() if (!query.table_function) getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames()); - if (query.select && settings.parallel_distributed_insert_select) + if (query.select && table->isRemote() && settings.parallel_distributed_insert_select) // Distributed INSERT SELECT distributed_pipeline = table->distributedWrite(query, getContext()); diff --git a/src/Storages/HDFS/StorageHDFSCluster.cpp b/src/Storages/HDFS/StorageHDFSCluster.cpp index 200c8cb3320..47a6fbf5eaa 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.cpp +++ b/src/Storages/HDFS/StorageHDFSCluster.cpp @@ -41,7 +41,7 @@ StorageHDFSCluster::StorageHDFSCluster( const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & compression_method_) - : IStorageCluster(table_id_) + : IStorage(table_id_) , cluster_name(cluster_name_) , uri(uri_) , format_name(format_name_) @@ -74,7 +74,13 @@ Pipe StorageHDFSCluster::read( size_t /*max_block_size*/, unsigned /*num_streams*/) { - createIteratorAndCallback(context); + auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); + + auto iterator = std::make_shared(context, uri); + auto callback = std::make_shared([iterator]() mutable -> String + { + return iterator->next(); + }); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block header = @@ -134,29 +140,6 @@ QueryProcessingStage::Enum StorageHDFSCluster::getQueryProcessingStage( } -void StorageHDFSCluster::createIteratorAndCallback(ContextPtr context) const -{ - cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); - - iterator = std::make_shared(context, uri); - callback = std::make_shared([iter = this->iterator]() mutable -> String { return iter->next(); }); -} - - -RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ContextPtr context) const -{ - createIteratorAndCallback(context); - return RemoteQueryExecutor::Extension{.task_iterator = callback}; -} - - -ClusterPtr StorageHDFSCluster::getCluster(ContextPtr context) const -{ - createIteratorAndCallback(context); - return cluster; -} - - NamesAndTypesList StorageHDFSCluster::getVirtuals() const { return NamesAndTypesList{ diff --git a/src/Storages/HDFS/StorageHDFSCluster.h b/src/Storages/HDFS/StorageHDFSCluster.h index 64b5fa86e05..21ae73c11ea 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.h +++ b/src/Storages/HDFS/StorageHDFSCluster.h @@ -9,7 +9,6 @@ #include #include -#include #include namespace DB @@ -17,7 +16,7 @@ namespace DB class Context; -class StorageHDFSCluster : public IStorageCluster +class StorageHDFSCluster : public IStorage { public: StorageHDFSCluster( @@ -40,20 +39,11 @@ public: NamesAndTypesList getVirtuals() const override; - ClusterPtr getCluster(ContextPtr context) const override; - RemoteQueryExecutor::Extension getTaskIteratorExtension(ContextPtr context) const override; - private: String cluster_name; String uri; String format_name; String compression_method; - - mutable ClusterPtr cluster; - mutable std::shared_ptr iterator; - mutable std::shared_ptr callback; - - void createIteratorAndCallback(ContextPtr context) const; }; diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h deleted file mode 100644 index ecab7266153..00000000000 --- a/src/Storages/IStorageCluster.h +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace DB -{ - - -/** - * Base cluster for Storages used in table functions like s3Cluster and hdfsCluster - * Needed for code simplification around parallel_distributed_insert_select - */ -class IStorageCluster: public IStorage -{ -public: - - explicit IStorageCluster(const StorageID & table_id_) : IStorage(table_id_) {} - - virtual ClusterPtr getCluster(ContextPtr context) const = 0; - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ContextPtr context) const = 0; - - bool isRemote() const override { return true; } -}; - - -} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index cc55b2ae271..b3ea2cb9f5b 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -59,8 +59,6 @@ #include #include -#include - #include #include #include @@ -761,35 +759,55 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata } -std::optional StorageDistributed::distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr local_context) const +std::optional StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context) { - const auto & settings = local_context->getSettingsRef(); + QueryPipeline pipeline; + + const Settings & settings = local_context->getSettingsRef(); + if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth) + throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); + + std::shared_ptr storage_src; + auto & select = query.select->as(); auto new_query = std::dynamic_pointer_cast(query.clone()); - - /// Unwrap view() function. - if (src_distributed.remote_table_function_ptr) + if (select.list_of_selects->children.size() == 1) { - const TableFunctionPtr src_table_function = - TableFunctionFactory::instance().get(src_distributed.remote_table_function_ptr, local_context); - const TableFunctionView * view_function = - assert_cast(src_table_function.get()); - new_query->select = view_function->getSelectQuery().clone(); - } - else - { - const auto select_with_union_query = std::make_shared(); - select_with_union_query->list_of_selects = std::make_shared(); + if (auto * select_query = select.list_of_selects->children.at(0)->as()) + { + JoinedTables joined_tables(Context::createCopy(local_context), *select_query); - auto * select = query.select->as().list_of_selects->children.at(0)->as(); - auto new_select_query = std::dynamic_pointer_cast(select->clone()); - select_with_union_query->list_of_selects->children.push_back(new_select_query); + if (joined_tables.tablesCount() == 1) + { + storage_src = std::dynamic_pointer_cast(joined_tables.getLeftTableStorage()); + if (storage_src) + { + /// Unwrap view() function. + if (storage_src->remote_table_function_ptr) + { + const TableFunctionPtr src_table_function = + TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context); + const TableFunctionView * view_function = + assert_cast(src_table_function.get()); + new_query->select = view_function->getSelectQuery().clone(); + } + else + { + const auto select_with_union_query = std::make_shared(); + select_with_union_query->list_of_selects = std::make_shared(); - new_select_query->replaceDatabaseAndTable(src_distributed.getRemoteDatabaseName(), src_distributed.getRemoteTableName()); + auto new_select_query = std::dynamic_pointer_cast(select_query->clone()); + select_with_union_query->list_of_selects->children.push_back(new_select_query); - new_query->select = select_with_union_query; + new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName()); + + new_query->select = select_with_union_query; + } + } + } + } } - const Cluster::AddressesWithFailover & src_addresses = src_distributed.getCluster()->getShardsAddresses(); + const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{}; const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses(); /// Compare addresses instead of cluster name, to handle remote()/cluster(). /// (since for remote()/cluster() the getClusterName() is empty string) @@ -804,7 +822,7 @@ std::optional StorageDistributed::distributedWriteBetweenDistribu LOG_WARNING(log, "Parallel distributed INSERT SELECT is not possible " "(source cluster={} ({} addresses), destination cluster={} ({} addresses))", - src_distributed.getClusterName(), + storage_src ? storage_src->getClusterName() : "", src_addresses.size(), getClusterName(), dst_addresses.size()); @@ -831,7 +849,6 @@ std::optional StorageDistributed::distributedWriteBetweenDistribu new_query_str = buf.str(); } - QueryPipeline pipeline; ContextMutablePtr query_context = Context::createCopy(local_context); ++query_context->getClientInfo().distributed_depth; @@ -865,114 +882,6 @@ std::optional StorageDistributed::distributedWriteBetweenDistribu } -std::optional StorageDistributed::distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) const -{ - const auto & settings = local_context->getSettingsRef(); - auto extension = src_storage_cluster.getTaskIteratorExtension(local_context); - - auto dst_cluster = getCluster(); - - auto new_query = std::dynamic_pointer_cast(query.clone()); - if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL) - { - new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName()); - /// Reset table function for INSERT INTO remote()/cluster() - new_query->table_function.reset(); - } - - String new_query_str; - { - WriteBufferFromOwnString buf; - IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true); - ast_format_settings.always_quote_identifiers = true; - new_query->IAST::format(ast_format_settings); - new_query_str = buf.str(); - } - - QueryPipeline pipeline; - ContextMutablePtr query_context = Context::createCopy(local_context); - ++query_context->getClientInfo().distributed_depth; - - /// Here we take addresses from destination cluster and assume source table exists on these nodes - for (const auto & replicas : getCluster()->getShardsAddresses()) - { - /// There will be only one replica, because we consider each replica as a shard - for (const auto & node : replicas) - { - auto connection = std::make_shared( - node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(), - node.user, node.password, node.quota_key, node.cluster, node.cluster_secret, - "ParallelInsertSelectInititiator", - node.compression, - node.secure - ); - - auto remote_query_executor = std::make_shared( - connection, - new_query_str, - Block{}, - query_context, - /*throttler=*/nullptr, - Scalars{}, - Tables{}, - QueryProcessingStage::Complete, - extension); - - QueryPipeline remote_pipeline(std::make_shared(remote_query_executor, false, settings.async_socket_for_remote)); - remote_pipeline.complete(std::make_shared(remote_query_executor->getHeader())); - - pipeline.addCompletedPipeline(std::move(remote_pipeline)); - } - } - - return pipeline; -} - - -std::optional StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context) -{ - const Settings & settings = local_context->getSettingsRef(); - if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth) - throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); - - auto & select = query.select->as(); - - StoragePtr src_storage; - - if (select.list_of_selects->children.size() == 1) - { - if (auto * select_query = select.list_of_selects->children.at(0)->as()) - { - JoinedTables joined_tables(Context::createCopy(local_context), *select_query); - - if (joined_tables.tablesCount() == 1) - { - src_storage = joined_tables.getLeftTableStorage(); - } - } - } - - if (!src_storage) - return {}; - - if (auto src_distributed = std::dynamic_pointer_cast(src_storage)) - { - return distributedWriteBetweenDistributedTables(*src_distributed, query, local_context); - } - else if (auto src_storage_cluster = std::dynamic_pointer_cast(src_storage)) - { - return distributedWriteFromClusterStorage(*src_storage_cluster, query, local_context); - } - else if (local_context->getClientInfo().distributed_depth == 0) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. "\ - "Reason: distributed reading is supported only from Distributed engine or *Cluster table functions, but got {} storage", src_storage->getName()); - } - - return {}; -} - - void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const { auto name_deps = getDependentViewsByColumn(local_context); diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 3161f4b50f6..7cb25ae46ab 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -208,9 +207,6 @@ private: void delayInsertOrThrowIfNeeded() const; - std::optional distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context) const; - std::optional distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr context) const; - String remote_database; String remote_table; ASTPtr remote_table_function_ptr; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 393cdf76c7a..02b3422f7d2 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -45,13 +45,11 @@ #include #include -#include #include #include #include #include #include -#include #include #include #include @@ -61,7 +59,6 @@ #include #include #include -#include #include #include @@ -76,7 +73,6 @@ #include #include #include -#include #include #include @@ -163,7 +159,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int CONCURRENT_ACCESS_NOT_SUPPORTED; extern const int CHECKSUM_DOESNT_MATCH; - extern const int TOO_LARGE_DISTRIBUTED_DEPTH; } namespace ActionLocks @@ -4472,106 +4467,6 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con } -std::optional StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) -{ - const auto & settings = local_context->getSettingsRef(); - auto extension = src_storage_cluster->getTaskIteratorExtension(local_context); - - /// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function - auto src_cluster = src_storage_cluster->getCluster(local_context); - - /// Actually the query doesn't change, we just serialize it to string - String query_str; - { - WriteBufferFromOwnString buf; - IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true); - ast_format_settings.always_quote_identifiers = true; - query.IAST::format(ast_format_settings); - query_str = buf.str(); - } - - QueryPipeline pipeline; - ContextMutablePtr query_context = Context::createCopy(local_context); - ++query_context->getClientInfo().distributed_depth; - - for (const auto & replicas : src_cluster->getShardsAddresses()) - { - /// There will be only one replica, because we consider each replica as a shard - for (const auto & node : replicas) - { - auto connection = std::make_shared( - node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(), - node.user, node.password, node.quota_key, node.cluster, node.cluster_secret, - "ParallelInsertSelectInititiator", - node.compression, - node.secure - ); - - auto remote_query_executor = std::make_shared( - connection, - query_str, - Block{}, - query_context, - /*throttler=*/nullptr, - Scalars{}, - Tables{}, - QueryProcessingStage::Complete, - extension); - - QueryPipeline remote_pipeline(std::make_shared(remote_query_executor, false, settings.async_socket_for_remote)); - remote_pipeline.complete(std::make_shared(remote_query_executor->getHeader())); - - pipeline.addCompletedPipeline(std::move(remote_pipeline)); - } - } - - return pipeline; -} - -std::optional StorageReplicatedMergeTree::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context) -{ - /// Do not enable parallel distributed INSERT SELECT in case when query probably comes from another server - if (local_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY) - return {}; - - const Settings & settings = local_context->getSettingsRef(); - if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth) - throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); - - auto & select = query.select->as(); - - StoragePtr src_storage; - - if (select.list_of_selects->children.size() == 1) - { - if (auto * select_query = select.list_of_selects->children.at(0)->as()) - { - JoinedTables joined_tables(Context::createCopy(local_context), *select_query); - - if (joined_tables.tablesCount() == 1) - { - src_storage = joined_tables.getLeftTableStorage(); - } - } - } - - if (!src_storage) - return {}; - - if (auto src_distributed = std::dynamic_pointer_cast(src_storage)) - { - return distributedWriteFromClusterStorage(src_distributed, query, local_context); - } - else if (local_context->getClientInfo().distributed_depth == 0) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. Reason: distributed "\ - "reading into Replicated table is supported only from *Cluster table functions, but got {} storage", src_storage->getName()); - } - - return {}; -} - - bool StorageReplicatedMergeTree::optimize( const ASTPtr &, const StorageMetadataPtr &, diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 3f03fb70f7a..d4eb49eba0d 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include @@ -137,8 +136,6 @@ public: SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override; - std::optional distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override; - bool optimize( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, @@ -468,8 +465,6 @@ private: std::mutex last_broken_disks_mutex; std::set last_broken_disks; - static std::optional distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context); - template void foreachActiveParts(Func && func, bool select_sequential_consistency) const; diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 0c5e69cb906..a3f368effa7 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -56,7 +56,7 @@ StorageS3Cluster::StorageS3Cluster( const ConstraintsDescription & constraints_, ContextPtr context_, const String & compression_method_) - : IStorageCluster(table_id_) + : IStorage(table_id_) , s3_configuration{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, {}, {}, S3Settings::ReadWriteSettings(context_->getSettingsRef())} , filename(filename_) , cluster_name(cluster_name_) @@ -105,7 +105,12 @@ Pipe StorageS3Cluster::read( unsigned /*num_streams*/) { StorageS3::updateS3Configuration(context, s3_configuration); - createIteratorAndCallback(query_info.query, context); + + auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); + + auto iterator = std::make_shared( + *s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context); + auto callback = std::make_shared([iterator]() mutable -> String { return iterator->next(); }); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block header = @@ -165,29 +170,6 @@ QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage( } -void StorageS3Cluster::createIteratorAndCallback(ASTPtr query, ContextPtr context) const -{ - cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); - iterator = std::make_shared( - *s3_configuration.client, s3_configuration.uri, query, virtual_block, context); - callback = std::make_shared([iter = this->iterator]() mutable -> String { return iter->next(); }); -} - - -RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ContextPtr context) const -{ - createIteratorAndCallback(/*query=*/nullptr, context); - return RemoteQueryExecutor::Extension{.task_iterator = callback}; -} - - -ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const -{ - createIteratorAndCallback(/*query=*/nullptr, context); - return cluster; -} - - NamesAndTypesList StorageS3Cluster::getVirtuals() const { return virtual_columns; diff --git a/src/Storages/StorageS3Cluster.h b/src/Storages/StorageS3Cluster.h index e18c33e79da..f823d1fdf04 100644 --- a/src/Storages/StorageS3Cluster.h +++ b/src/Storages/StorageS3Cluster.h @@ -10,7 +10,6 @@ #include "Client/Connection.h" #include #include -#include #include namespace DB @@ -18,7 +17,7 @@ namespace DB class Context; -class StorageS3Cluster : public IStorageCluster +class StorageS3Cluster : public IStorage { public: StorageS3Cluster( @@ -43,22 +42,15 @@ public: NamesAndTypesList getVirtuals() const override; - RemoteQueryExecutor::Extension getTaskIteratorExtension(ContextPtr context) const override; - ClusterPtr getCluster(ContextPtr context) const override; private: StorageS3::S3Configuration s3_configuration; + String filename; String cluster_name; String format_name; String compression_method; NamesAndTypesList virtual_columns; Block virtual_block; - - mutable ClusterPtr cluster; - mutable std::shared_ptr iterator; - mutable std::shared_ptr callback; - - void createIteratorAndCallback(ASTPtr query, ContextPtr context) const; }; diff --git a/tests/integration/test_s3_cluster/configs/cluster.xml b/tests/integration/test_s3_cluster/configs/cluster.xml index 39275e99abd..18f15763633 100644 --- a/tests/integration/test_s3_cluster/configs/cluster.xml +++ b/tests/integration/test_s3_cluster/configs/cluster.xml @@ -20,23 +20,8 @@ - - - - - - s0_0_0 - 9000 - - - s0_0_1 - 9000 - - - - cluster_simple - + \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 2384aa6e059..2cbb36fcf06 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -34,24 +34,10 @@ def started_cluster(): try: cluster = ClickHouseCluster(__file__) cluster.add_instance( - "s0_0_0", - main_configs=["configs/cluster.xml"], - macros={"replica": "node1", "shard": "shard1"}, - with_minio=True, - with_zookeeper=True, - ) - cluster.add_instance( - "s0_0_1", - main_configs=["configs/cluster.xml"], - macros={"replica": "replica2", "shard": "shard1"}, - with_zookeeper=True, - ) - cluster.add_instance( - "s0_1_0", - main_configs=["configs/cluster.xml"], - macros={"replica": "replica1", "shard": "shard2"}, - with_zookeeper=True, + "s0_0_0", main_configs=["configs/cluster.xml"], with_minio=True ) + cluster.add_instance("s0_0_1", main_configs=["configs/cluster.xml"]) + cluster.add_instance("s0_1_0", main_configs=["configs/cluster.xml"]) logging.info("Starting cluster...") cluster.start() @@ -69,17 +55,17 @@ def test_select_all(started_cluster): pure_s3 = node.query( """ SELECT * from s3( - 'http://minio1:9001/root/data/{clickhouse,database}/*', - 'minio', 'minio123', 'CSV', - 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)""" ) # print(pure_s3) s3_distibuted = node.query( """ SELECT * from s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)""" ) # print(s3_distibuted) @@ -92,15 +78,15 @@ def test_count(started_cluster): pure_s3 = node.query( """ SELECT count(*) from s3( - 'http://minio1:9001/root/data/{clickhouse,database}/*', - 'minio', 'minio123', 'CSV', + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')""" ) # print(pure_s3) s3_distibuted = node.query( """ SELECT count(*) from s3Cluster( - 'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')""" ) @@ -139,13 +125,13 @@ def test_union_all(started_cluster): SELECT * FROM ( SELECT * from s3( - 'http://minio1:9001/root/data/{clickhouse,database}/*', - 'minio', 'minio123', 'CSV', - 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') UNION ALL SELECT * from s3( - 'http://minio1:9001/root/data/{clickhouse,database}/*', - 'minio', 'minio123', 'CSV', + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ) ORDER BY (name, value, polygon) @@ -157,13 +143,13 @@ def test_union_all(started_cluster): SELECT * FROM ( SELECT * from s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') UNION ALL SELECT * from s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ) ORDER BY (name, value, polygon) @@ -180,12 +166,12 @@ def test_wrong_cluster(started_cluster): """ SELECT count(*) from s3Cluster( 'non_existent_cluster', - 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') UNION ALL SELECT count(*) from s3Cluster( 'non_existent_cluster', - 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') """ ) @@ -198,137 +184,14 @@ def test_ambiguous_join(started_cluster): result = node.query( """ SELECT l.name, r.value from s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as l JOIN s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as r ON l.name = r.name """ ) assert "AMBIGUOUS_COLUMN_NAME" not in result - - -def test_distributed_insert_select(started_cluster): - first_replica_first_shard = started_cluster.instances["s0_0_0"] - second_replica_first_shard = started_cluster.instances["s0_0_1"] - first_replica_second_shard = started_cluster.instances["s0_1_0"] - - first_replica_first_shard.query( - """ - CREATE TABLE insert_select_local ON CLUSTER 'cluster_simple' (a String, b UInt64) - ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select', '{replica}') - ORDER BY (a, b); - """ - ) - - first_replica_first_shard.query( - """ - CREATE TABLE insert_select_distributed ON CLUSTER 'cluster_simple' as insert_select_local - ENGINE = Distributed('cluster_simple', default, insert_select_local, b % 2); - """ - ) - - for file_number in range(100): - first_replica_first_shard.query( - """ - INSERT INTO TABLE FUNCTION s3('http://minio1:9001/root/data/generated/file_{}.csv', 'minio', 'minio123', 'CSV','a String, b UInt64') - SELECT repeat('{}', 10), number from numbers(100); - """.format( - file_number, file_number - ) - ) - - first_replica_first_shard.query( - """ - INSERT INTO insert_select_distributed SELECT * FROM s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64' - ) SETTINGS parallel_distributed_insert_select=1; - """ - ) - - for line in ( - first_replica_first_shard.query("""SELECT * FROM insert_select_local;""") - .strip() - .split("\n") - ): - _, b = line.split() - assert int(b) % 2 == 0 - - for line in ( - second_replica_first_shard.query("""SELECT * FROM insert_select_local;""") - .strip() - .split("\n") - ): - _, b = line.split() - assert int(b) % 2 == 0 - - for line in ( - first_replica_second_shard.query("""SELECT * FROM insert_select_local;""") - .strip() - .split("\n") - ): - _, b = line.split() - assert int(b) % 2 == 1 - - -def test_distributed_insert_select_with_replicated(started_cluster): - first_replica_first_shard = started_cluster.instances["s0_0_0"] - second_replica_first_shard = started_cluster.instances["s0_0_1"] - - first_replica_first_shard.query( - """ - CREATE TABLE insert_select_replicated_local ON CLUSTER 'first_shard' (a String, b UInt64) - ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}') - ORDER BY (a, b); - """ - ) - - for replica in [first_replica_first_shard, second_replica_first_shard]: - replica.query( - """ - SYSTEM STOP FETCHES; - """ - ) - replica.query( - """ - SYSTEM STOP MERGES; - """ - ) - - for file_number in range(100): - first_replica_first_shard.query( - """ - INSERT INTO TABLE FUNCTION s3('http://minio1:9001/root/data/generated_replicated/file_{}.csv', 'minio', 'minio123', 'CSV','a String, b UInt64') - SELECT repeat('{}', 10), number from numbers(100); - """.format( - file_number, file_number - ) - ) - - first_replica_first_shard.query( - """ - INSERT INTO insert_select_replicated_local SELECT * FROM s3Cluster( - 'first_shard', - 'http://minio1:9001/root/data/generated_replicated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64' - ) SETTINGS parallel_distributed_insert_select=1; - """ - ) - - first = int( - first_replica_first_shard.query( - """SELECT count(*) FROM insert_select_replicated_local""" - ).strip() - ) - second = int( - second_replica_first_shard.query( - """SELECT count(*) FROM insert_select_replicated_local""" - ).strip() - ) - - assert first != 0 - assert second != 0 - assert first + second == 100 * 100