From 31eddf607377fbda0f95028e9d0cda9def958fbe Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 13 Nov 2024 16:13:14 +0100 Subject: [PATCH 01/14] Replace table function engines with their -Cluster alternatives --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + src/Storages/StorageFileCluster.cpp | 10 +-- src/Storages/StorageURLCluster.cpp | 10 +-- ...tTableFunctionArgumentsFromSelectQuery.cpp | 15 ++++- ...actTableFunctionArgumentsFromSelectQuery.h | 2 + src/TableFunctions/ITableFunctionCluster.h | 21 ++++-- .../TableFunctionObjectStorage.cpp | 64 +++++++++++++++++-- .../TableFunctionObjectStorage.h | 2 + src/TableFunctions/TableFunctionURL.cpp | 60 +++++++++++++++-- .../TableFunctionURLCluster.cpp | 28 ++++---- 11 files changed, 178 insertions(+), 38 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 907edf8cc18..41459404d08 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5643,6 +5643,9 @@ Parts virtually divided into segments to be distributed between replicas for par DECLARE(Bool, parallel_replicas_local_plan, true, R"( Build local plan for local replica )", BETA) \ + DECLARE(Bool, parallel_replicas_for_cluster_engines, true, R"( +Replace table function engines with their -Cluster alternatives +)", EXPERIMENTAL) \ \ DECLARE(Bool, allow_experimental_analyzer, true, R"( Allow new query analyzer. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 64913f03754..58a5e105111 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -60,6 +60,7 @@ static std::initializer_listchildren, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); + table_function, + storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), + format_name, + context + ); } RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 04fd5fb9675..8c288f6931a 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -86,12 +86,14 @@ StorageURLCluster::StorageURLCluster( void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { - ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); - if (!expression_list) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query)); + auto * table_function = extractTableFunctionFromSelectQuery(query); TableFunctionURLCluster::updateStructureAndFormatArgumentsIfNeeded( - expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); + table_function, + storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), + format_name, + context + ); } RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp index 382964d9fe1..dd4f05d1f56 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp @@ -1,7 +1,6 @@ #include #include -#include #include #include #include @@ -26,4 +25,18 @@ ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) return table_function->arguments->as(); } +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto * select_query = query->as(); + if (!select_query || !select_query->tables()) + return nullptr; + + auto * tables = select_query->tables()->as(); + auto * table_expression = tables->children[0]->as()->table_expression->as(); + if (!table_expression->table_function) + return nullptr; + + return table_expression->table_function->as(); +} + } diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h index af19ef656cc..39c19ebb28e 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h +++ b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h @@ -2,10 +2,12 @@ #include #include +#include namespace DB { ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); } diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index 6935ac39e79..a0c715e298e 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -2,10 +2,13 @@ #include #include +#include #include #include #include +#include + namespace DB { @@ -24,15 +27,23 @@ class ITableFunctionCluster : public Base public: String getName() const override = 0; - static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context) + static void updateStructureAndFormatArgumentsIfNeeded(ASTFunction * table_function, const String & structure_, const String & format_, const ContextPtr & context) { + auto * expression_list = table_function->arguments->as(); + ASTs args = expression_list->children; + if (args.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name); - ASTPtr cluster_name_arg = args.front(); - args.erase(args.begin()); - Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); - args.insert(args.begin(), cluster_name_arg); + if (table_function-> name == Base::name) + Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); + else + { + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); + args.insert(args.begin(), cluster_name_arg); + } } protected: diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 12de08afad0..2f333b53669 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -1,5 +1,8 @@ #include "config.h" +#include +#include + #include #include #include @@ -19,11 +22,20 @@ #include #include #include +#include namespace DB { +namespace Setting +{ + extern const SettingsBool use_parallel_replicas; + extern const SettingsBool parallel_replicas_for_cluster_engines; + extern const SettingsString cluster_for_parallel_replicas; + extern const SettingsParallelReplicasMode parallel_replicas_mode; +} + namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; @@ -100,8 +112,9 @@ StoragePtr TableFunctionObjectStorage::executeImpl( ColumnsDescription cached_columns, bool is_insert_query) const { - ColumnsDescription columns; chassert(configuration); + ColumnsDescription columns; + if (configuration->structure != "auto") columns = parseColumnsListFromString(configuration->structure, context); else if (!structure_hint.empty()) @@ -109,18 +122,61 @@ StoragePtr TableFunctionObjectStorage::executeImpl( else if (!cached_columns.empty()) columns = cached_columns; - StoragePtr storage = std::make_shared( + StoragePtr storage; + + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + storage = std::make_shared( + configuration, + getObjectStorage(context, !is_insert_query), + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + /* comment */ String{}, + /* format_settings */ std::nullopt, + /* mode */ LoadingStrictnessLevel::CREATE, + /* distributed_processing */ true, + /* partition_by */ nullptr); + + storage->startup(); + return storage; + } + + const auto & settings = context->getSettingsRef(); + auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); + auto can_use_parallel_replicas = settings[Setting::use_parallel_replicas] + && settings[Setting::parallel_replicas_for_cluster_engines] + && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS + && !parallel_replicas_cluster_name.empty(); + + if (can_use_parallel_replicas) + { + storage = std::make_shared( + parallel_replicas_cluster_name, + configuration, + getObjectStorage(context, !is_insert_query), + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + context); + + storage->startup(); + return storage; + } + + storage = std::make_shared( configuration, getObjectStorage(context, !is_insert_query), context, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, - String{}, + /* comment */ String{}, /* format_settings */ std::nullopt, /* mode */ LoadingStrictnessLevel::CREATE, /* distributed_processing */ false, - nullptr); + /* partition_by */ nullptr); storage->startup(); return storage; diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 19cd637bd80..9c78342f611 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -6,8 +6,10 @@ #include #include #include + #include "config.h" + namespace DB { diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index 8f4841a992b..08c2aec30c3 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -2,22 +2,34 @@ #include "registerTableFunctions.h" #include +#include +#include +#include +#include #include +#include #include #include #include #include +#include #include -#include -#include -#include -#include #include #include + + namespace DB { +namespace Setting +{ + extern const SettingsBool use_parallel_replicas; + extern const SettingsBool parallel_replicas_for_cluster_engines; + extern const SettingsString cluster_for_parallel_replicas; + extern const SettingsParallelReplicasMode parallel_replicas_mode; +} + std::vector TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const { auto & table_function_node = query_node_table_function->as(); @@ -120,6 +132,46 @@ StoragePtr TableFunctionURL::getStorage( const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, const std::string & table_name, const String & compression_method_) const { + if (global_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + return std::make_shared( + source, + StorageID(getDatabaseName(), table_name), + format_, + std::nullopt /*format settings*/, + columns, + ConstraintsDescription{}, + String{}, + global_context, + compression_method_, + configuration.headers, + configuration.http_method, + nullptr, + /*distributed_processing*/ true); + } + + const auto & settings = global_context->getSettingsRef(); + auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); + auto can_use_parallel_replicas = settings[Setting::use_parallel_replicas] + && settings[Setting::parallel_replicas_for_cluster_engines] + && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS + && !parallel_replicas_cluster_name.empty(); + + if (can_use_parallel_replicas) + { + LOG_DEBUG(&Poco::Logger::get("TableFunctionURL"), "TableFunctionURL::getStorage wrapped to cluster version"); + return std::make_shared( + global_context, + settings[Setting::cluster_for_parallel_replicas], + filename, + format, + compression_method, + StorageID(getDatabaseName(), table_name), + getActualTableStructure(global_context, /* is_insert_query */ true), + ConstraintsDescription{}, + configuration); + } + return std::make_shared( source, StorageID(getDatabaseName(), table_name), diff --git a/src/TableFunctions/TableFunctionURLCluster.cpp b/src/TableFunctions/TableFunctionURLCluster.cpp index 5fd3c3342a5..f70b7f5c882 100644 --- a/src/TableFunctions/TableFunctionURLCluster.cpp +++ b/src/TableFunctions/TableFunctionURLCluster.cpp @@ -10,11 +10,10 @@ StoragePtr TableFunctionURLCluster::getStorage( const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context, const std::string & table_name, const String & /*compression_method_*/) const { - StoragePtr storage; if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { //On worker node this uri won't contain globs - storage = std::make_shared( + return std::make_shared( filename, StorageID(getDatabaseName(), table_name), format, @@ -29,20 +28,17 @@ StoragePtr TableFunctionURLCluster::getStorage( nullptr, /*distributed_processing=*/ true); } - else - { - storage = std::make_shared( - context, - cluster_name, - filename, - format, - compression_method, - StorageID(getDatabaseName(), table_name), - getActualTableStructure(context, /* is_insert_query */ true), - ConstraintsDescription{}, - configuration); - } - return storage; + + return std::make_shared( + context, + cluster_name, + filename, + format, + compression_method, + StorageID(getDatabaseName(), table_name), + getActualTableStructure(context, /* is_insert_query */ true), + ConstraintsDescription{}, + configuration); } void registerTableFunctionURLCluster(TableFunctionFactory & factory) From 47bae80a391f1301f36290c5d3093cf91a97c9b0 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 13 Nov 2024 16:21:08 +0100 Subject: [PATCH 02/14] Lint --- src/Storages/StorageFileCluster.cpp | 5 ----- src/Storages/StorageURLCluster.cpp | 6 +----- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 8aec54cb6b2..466ab98bf45 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -19,11 +19,6 @@ namespace DB { -namespace ErrorCodes -{ -extern const int LOGICAL_ERROR; -} - StorageFileCluster::StorageFileCluster( const ContextPtr & context, const String & cluster_name_, diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 8c288f6931a..b54f88a030f 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -30,16 +30,12 @@ namespace DB { + namespace Setting { extern const SettingsUInt64 glob_expansion_max_elements; } -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - StorageURLCluster::StorageURLCluster( const ContextPtr & context, const String & cluster_name_, From 769fe9878229f902abe8e9501718f6a8fa69cb4c Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Thu, 14 Nov 2024 11:47:38 +0100 Subject: [PATCH 03/14] Fix linking issue --- src/TableFunctions/TableFunctionObjectStorage.cpp | 4 ++-- src/TableFunctions/TableFunctionURL.cpp | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 2f333b53669..998c8ca7bfc 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -30,7 +30,7 @@ namespace DB namespace Setting { - extern const SettingsBool use_parallel_replicas; + extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas; extern const SettingsBool parallel_replicas_for_cluster_engines; extern const SettingsString cluster_for_parallel_replicas; extern const SettingsParallelReplicasMode parallel_replicas_mode; @@ -145,7 +145,7 @@ StoragePtr TableFunctionObjectStorage::executeImpl( const auto & settings = context->getSettingsRef(); auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); - auto can_use_parallel_replicas = settings[Setting::use_parallel_replicas] + auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 && settings[Setting::parallel_replicas_for_cluster_engines] && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS && !parallel_replicas_cluster_name.empty(); diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index 08c2aec30c3..aae2a5cb47a 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -24,7 +24,7 @@ namespace DB namespace Setting { - extern const SettingsBool use_parallel_replicas; + extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas; extern const SettingsBool parallel_replicas_for_cluster_engines; extern const SettingsString cluster_for_parallel_replicas; extern const SettingsParallelReplicasMode parallel_replicas_mode; @@ -152,14 +152,13 @@ StoragePtr TableFunctionURL::getStorage( const auto & settings = global_context->getSettingsRef(); auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); - auto can_use_parallel_replicas = settings[Setting::use_parallel_replicas] + auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 && settings[Setting::parallel_replicas_for_cluster_engines] && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS && !parallel_replicas_cluster_name.empty(); if (can_use_parallel_replicas) { - LOG_DEBUG(&Poco::Logger::get("TableFunctionURL"), "TableFunctionURL::getStorage wrapped to cluster version"); return std::make_shared( global_context, settings[Setting::cluster_for_parallel_replicas], From e7ce3e1b7367cd440ebfa083ec8c4a2bc3e168c7 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Thu, 14 Nov 2024 15:45:11 +0100 Subject: [PATCH 04/14] Poke CI From 03352372e913b9e678548eabfddb06137875bce0 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Mon, 18 Nov 2024 13:50:06 +0100 Subject: [PATCH 05/14] Check for `isDistributed` --- src/TableFunctions/TableFunctionObjectStorage.cpp | 5 ++++- src/TableFunctions/TableFunctionURL.cpp | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 998c8ca7bfc..b0ded850305 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -148,7 +148,10 @@ StoragePtr TableFunctionObjectStorage::executeImpl( auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 && settings[Setting::parallel_replicas_for_cluster_engines] && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS - && !parallel_replicas_cluster_name.empty(); + && !parallel_replicas_cluster_name.empty() + && !context->isDistributed(); + + LOG_DEBUG(&Poco::Logger::get("TableFunctionObjectStorage"), "Is distributed: {}", context->isDistributed()); if (can_use_parallel_replicas) { diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index aae2a5cb47a..2fd7090d811 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -17,6 +17,7 @@ #include #include +#include "Common/logger_useful.h" namespace DB @@ -155,7 +156,10 @@ StoragePtr TableFunctionURL::getStorage( auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 && settings[Setting::parallel_replicas_for_cluster_engines] && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS - && !parallel_replicas_cluster_name.empty(); + && !parallel_replicas_cluster_name.empty() + && !global_context->isDistributed(); + + LOG_DEBUG(&Poco::Logger::get("TableFunctionURL"), "Is distributed: {}", global_context->isDistributed()); if (can_use_parallel_replicas) { From 3ff22e3fc19244689a37ac540f218ec7d6e5caf4 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Mon, 18 Nov 2024 18:56:02 +0100 Subject: [PATCH 06/14] Fix arguments parsing for object storage functions --- .../StorageObjectStorageCluster.cpp | 18 +++++++++++++----- ...ctTableFunctionArgumentsFromSelectQuery.cpp | 15 --------------- src/TableFunctions/ITableFunctionCluster.h | 2 +- .../TableFunctionObjectStorage.cpp | 2 -- src/TableFunctions/TableFunctionURL.cpp | 2 -- 5 files changed, 14 insertions(+), 25 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 176f20cf5ca..f66e413055d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -86,7 +86,9 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( const DB::StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { - ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); + auto * table_function = extractTableFunctionFromSelectQuery(query); + auto * expression_list = table_function->arguments->as(); + if (!expression_list) { throw Exception( @@ -105,10 +107,16 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( configuration->getEngineName()); } - ASTPtr cluster_name_arg = args.front(); - args.erase(args.begin()); - configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); - args.insert(args.begin(), cluster_name_arg); + if (table_function->name == configuration->getTypeName()) + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); + else + { + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); + args.insert(args.begin(), cluster_name_arg); + } + } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp index dd4f05d1f56..1dde8f42ea5 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp @@ -10,21 +10,6 @@ namespace DB { -ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) -{ - auto * select_query = query->as(); - if (!select_query || !select_query->tables()) - return nullptr; - - auto * tables = select_query->tables()->as(); - auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) - return nullptr; - - auto * table_function = table_expression->table_function->as(); - return table_function->arguments->as(); -} - ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index a0c715e298e..c76f5f2b95f 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -35,7 +35,7 @@ public: if (args.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name); - if (table_function-> name == Base::name) + if (table_function->name == Base::name) Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); else { diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index b0ded850305..3be269923bc 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -151,8 +151,6 @@ StoragePtr TableFunctionObjectStorage::executeImpl( && !parallel_replicas_cluster_name.empty() && !context->isDistributed(); - LOG_DEBUG(&Poco::Logger::get("TableFunctionObjectStorage"), "Is distributed: {}", context->isDistributed()); - if (can_use_parallel_replicas) { storage = std::make_shared( diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index 2fd7090d811..67280d1fab8 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -159,8 +159,6 @@ StoragePtr TableFunctionURL::getStorage( && !parallel_replicas_cluster_name.empty() && !global_context->isDistributed(); - LOG_DEBUG(&Poco::Logger::get("TableFunctionURL"), "Is distributed: {}", global_context->isDistributed()); - if (can_use_parallel_replicas) { return std::make_shared( From 2886575776ee6ecd35d6f9f881f1ce7e3b2d0ef5 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Tue, 19 Nov 2024 18:28:01 +0100 Subject: [PATCH 07/14] Prototype a test --- ...er_functions_with_parallel_replicas.reference | 0 ..._cluster_functions_with_parallel_replicas.sql | 16 ++++++++++++++++ 2 files changed, 16 insertions(+) create mode 100644 tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference create mode 100644 tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql diff --git a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql new file mode 100644 index 00000000000..3b3649f80d9 --- /dev/null +++ b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql @@ -0,0 +1,16 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: Depends on AWS + +SET enable_parallel_replicas=1, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_two_shard_three_replicas_localhost'; +SELECT * FROM s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3 SETTINGS log_comment='03275_16cb4bb2-813a-43c2-8956-fa3520454020_parallel_replicas'; + +SET enable_parallel_replicas=0; +SELECT * FROM s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3 SETTINGS log_comment='03275_16cb4bb2-813a-43c2-8956-fa3520454020_single_replica'; + +SYSTEM FLUSH LOGS; + +SET enable_parallel_replicas=0; +SET max_rows_to_read = 0; -- system.text_log can be really big +SELECT count() > 0 FROM system.text_log +WHERE query_id in (select query_id from system.query_log where log_comment like '03275_16cb4bb2-813a-43c2-8956-fa3520454020%') + AND message LIKE '%Parallel reading from replicas is disabled for cluster%'; From b1e7847ee873d2791eb54d619bdc95bd4444af98 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Tue, 19 Nov 2024 18:37:44 +0100 Subject: [PATCH 08/14] Prototype a test --- .../03275_auto_cluster_functions_with_parallel_replicas.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql index 3b3649f80d9..8cdbdeb1b9b 100644 --- a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql +++ b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql @@ -12,5 +12,5 @@ SYSTEM FLUSH LOGS; SET enable_parallel_replicas=0; SET max_rows_to_read = 0; -- system.text_log can be really big SELECT count() > 0 FROM system.text_log -WHERE query_id in (select query_id from system.query_log where log_comment like '03275_16cb4bb2-813a-43c2-8956-fa3520454020%') +WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() and log_comment like '03275_16cb4bb2-813a-43c2-8956-fa3520454020%') AND message LIKE '%Parallel reading from replicas is disabled for cluster%'; From e3e6c79d724e535dc58a868364b3ba257e0ca0e8 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 20 Nov 2024 14:46:29 +0100 Subject: [PATCH 09/14] Lint --- src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp | 2 +- src/Storages/StorageFileCluster.cpp | 2 +- src/Storages/StorageURLCluster.cpp | 2 +- ...mSelectQuery.cpp => extractTableFunctionFromSelectQuery.cpp} | 2 +- ...sFromSelectQuery.h => extractTableFunctionFromSelectQuery.h} | 1 - src/TableFunctions/ITableFunctionCluster.h | 2 -- src/TableFunctions/TableFunctionURL.cpp | 1 - 7 files changed, 4 insertions(+), 8 deletions(-) rename src/Storages/{extractTableFunctionArgumentsFromSelectQuery.cpp => extractTableFunctionFromSelectQuery.cpp} (91%) rename src/Storages/{extractTableFunctionArgumentsFromSelectQuery.h => extractTableFunctionFromSelectQuery.h} (71%) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index f66e413055d..6c9cf68330a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include namespace DB diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 466ab98bf45..fab2d49baab 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index b54f88a030f..bd6da5446b4 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp similarity index 91% rename from src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp rename to src/Storages/extractTableFunctionFromSelectQuery.cpp index 1dde8f42ea5..2040cd4b3ad 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h similarity index 71% rename from src/Storages/extractTableFunctionArgumentsFromSelectQuery.h rename to src/Storages/extractTableFunctionFromSelectQuery.h index 39c19ebb28e..c69cc7ce6c5 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -7,7 +7,6 @@ namespace DB { -ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); } diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index c76f5f2b95f..45757495dc0 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -7,8 +7,6 @@ #include #include -#include - namespace DB { diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index 67280d1fab8..daa83dc9735 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -17,7 +17,6 @@ #include #include -#include "Common/logger_useful.h" namespace DB From b47a95bd4f3553c59c94b22093d8aba5253e3a7a Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 20 Nov 2024 15:54:42 +0100 Subject: [PATCH 10/14] Remove unnecessary checks --- .../TableFunctionObjectStorage.cpp | 21 +------------------ src/TableFunctions/TableFunctionURL.cpp | 18 ---------------- 2 files changed, 1 insertion(+), 38 deletions(-) diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 3be269923bc..202e41b2fc2 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -123,27 +123,8 @@ StoragePtr TableFunctionObjectStorage::executeImpl( columns = cached_columns; StoragePtr storage; - - if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) - { - storage = std::make_shared( - configuration, - getObjectStorage(context, !is_insert_query), - context, - StorageID(getDatabaseName(), table_name), - columns, - ConstraintsDescription{}, - /* comment */ String{}, - /* format_settings */ std::nullopt, - /* mode */ LoadingStrictnessLevel::CREATE, - /* distributed_processing */ true, - /* partition_by */ nullptr); - - storage->startup(); - return storage; - } - const auto & settings = context->getSettingsRef(); + auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 && settings[Setting::parallel_replicas_for_cluster_engines] diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index daa83dc9735..bedb4955c94 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -132,24 +132,6 @@ StoragePtr TableFunctionURL::getStorage( const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, const std::string & table_name, const String & compression_method_) const { - if (global_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) - { - return std::make_shared( - source, - StorageID(getDatabaseName(), table_name), - format_, - std::nullopt /*format settings*/, - columns, - ConstraintsDescription{}, - String{}, - global_context, - compression_method_, - configuration.headers, - configuration.http_method, - nullptr, - /*distributed_processing*/ true); - } - const auto & settings = global_context->getSettingsRef(); auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 From 07821a576dda4d8b737c0050c04fac23acade585 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 20 Nov 2024 16:21:13 +0100 Subject: [PATCH 11/14] Cleaner test --- ..._functions_with_parallel_replicas.reference | 8 ++++++++ ...luster_functions_with_parallel_replicas.sql | 18 ++++++++---------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference index e69de29bb2d..7859daeb71f 100644 --- a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference +++ b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference @@ -0,0 +1,8 @@ +Expression (Project names) + ReadFromCluster +Expression (Project names) + ReadFromCluster +Expression ((Project names + (Projection + Change column names to column identifiers))) + ReadFromURL +Expression ((Project names + (Projection + Change column names to column identifiers))) + S3Source diff --git a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql index 8cdbdeb1b9b..ee0d355d849 100644 --- a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql +++ b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql @@ -1,16 +1,14 @@ -- Tags: no-fasttest -- Tag no-fasttest: Depends on AWS -SET enable_parallel_replicas=1, max_parallel_replicas=3, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_two_shard_three_replicas_localhost'; -SELECT * FROM s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3 SETTINGS log_comment='03275_16cb4bb2-813a-43c2-8956-fa3520454020_parallel_replicas'; +SET enable_parallel_replicas=1; +SET cluster_for_parallel_replicas='default'; +SET parallel_replicas_for_cluster_engines=true; -SET enable_parallel_replicas=0; -SELECT * FROM s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3 SETTINGS log_comment='03275_16cb4bb2-813a-43c2-8956-fa3520454020_single_replica'; +EXPLAIN SELECT * FROM url('http://localhost:8123'); +EXPLAIN SELECT * FROM s3('http://localhost:11111/test/a.tsv', 'TSV'); -SYSTEM FLUSH LOGS; +SET parallel_replicas_for_cluster_engines=false; -SET enable_parallel_replicas=0; -SET max_rows_to_read = 0; -- system.text_log can be really big -SELECT count() > 0 FROM system.text_log -WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() and log_comment like '03275_16cb4bb2-813a-43c2-8956-fa3520454020%') - AND message LIKE '%Parallel reading from replicas is disabled for cluster%'; +EXPLAIN SELECT * FROM url('http://localhost:8123'); +EXPLAIN SELECT * FROM s3('http://localhost:11111/test/a.tsv', 'TSV'); From 5b1f7c258ba656be6bd1178c6e4de50f01180610 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 20 Nov 2024 16:32:04 +0100 Subject: [PATCH 12/14] Stricter checks --- .../ObjectStorage/StorageObjectStorageCluster.cpp | 13 +++++++++++-- src/Storages/StorageURLCluster.cpp | 6 ++++++ src/TableFunctions/ITableFunctionCluster.h | 4 +++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 6c9cf68330a..42c66e1f49c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -87,8 +87,15 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( const ContextPtr & context) { auto * table_function = extractTableFunctionFromSelectQuery(query); - auto * expression_list = table_function->arguments->as(); + if (!table_function) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected SELECT query from table function {}, got '{}'", + configuration->getEngineName(), queryToString(query)); + } + auto * expression_list = table_function->arguments->as(); if (!expression_list) { throw Exception( @@ -109,13 +116,15 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( if (table_function->name == configuration->getTypeName()) configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); - else + else if (table_function->name == fmt::format("{}Cluster", configuration->getTypeName())) { ASTPtr cluster_name_arg = args.front(); args.erase(args.begin()); configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); args.insert(args.begin(), cluster_name_arg); } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected table function name: {}", table_function->name); } diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index bd6da5446b4..3f11ee5da1d 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -83,6 +83,12 @@ StorageURLCluster::StorageURLCluster( void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { auto * table_function = extractTableFunctionFromSelectQuery(query); + if (!table_function) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query)); + + auto * expression_list = table_function->arguments->as(); + if (!expression_list) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query)); TableFunctionURLCluster::updateStructureAndFormatArgumentsIfNeeded( table_function, diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index 45757495dc0..c935c586243 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -35,13 +35,15 @@ public: if (table_function->name == Base::name) Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); - else + else if (table_function->name == fmt::format("{}Cluster", Base::name)) { ASTPtr cluster_name_arg = args.front(); args.erase(args.begin()); Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); args.insert(args.begin(), cluster_name_arg); } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected table function name: {}", table_function->name); } protected: From f2a2941fa6a3fc67e27cb9d5f41e859566a3e79f Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 20 Nov 2024 18:26:58 +0100 Subject: [PATCH 13/14] Fix style check --- src/Storages/StorageURLCluster.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 3f11ee5da1d..1742c0a1c21 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -31,6 +31,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + namespace Setting { extern const SettingsUInt64 glob_expansion_max_elements; From 93a3c1d27e4e62dbf0158fc0e35372f9d9302a85 Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Wed, 20 Nov 2024 18:48:13 +0100 Subject: [PATCH 14/14] What will break if I do this --- src/Storages/IStorageCluster.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 219092e7ab5..337bb6284e8 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -125,7 +125,7 @@ void IStorageCluster::read( storage_snapshot->check(column_names); updateBeforeRead(context); - auto cluster = getCluster(context); + auto cluster = context->getCluster(cluster_name); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)