diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 9f913a4ff9a..c9261518f4b 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5653,6 +5653,9 @@ Parts virtually divided into segments to be distributed between replicas for par DECLARE(Bool, parallel_replicas_local_plan, true, R"( Build local plan for local replica )", BETA) \ + DECLARE(Bool, parallel_replicas_for_cluster_engines, true, R"( +Replace table function engines with their -Cluster alternatives +)", EXPERIMENTAL) \ \ DECLARE(Bool, allow_experimental_analyzer, true, R"( Allow new query analyzer. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index f0d3e001362..93287721eeb 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -60,6 +60,7 @@ static std::initializer_listcheck(column_names); updateBeforeRead(context); - auto cluster = getCluster(context); + auto cluster = context->getCluster(cluster_name); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 176f20cf5ca..42c66e1f49c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include namespace DB @@ -86,7 +86,16 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( const DB::StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { - ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); + auto * table_function = extractTableFunctionFromSelectQuery(query); + if (!table_function) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected SELECT query from table function {}, got '{}'", + configuration->getEngineName(), queryToString(query)); + } + + auto * expression_list = table_function->arguments->as(); if (!expression_list) { throw Exception( @@ -105,10 +114,18 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( configuration->getEngineName()); } - ASTPtr cluster_name_arg = args.front(); - args.erase(args.begin()); - configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); - args.insert(args.begin(), cluster_name_arg); + if (table_function->name == configuration->getTypeName()) + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); + else if (table_function->name == fmt::format("{}Cluster", configuration->getTypeName())) + { + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); + args.insert(args.begin(), cluster_name_arg); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected table function name: {}", table_function->name); + } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index c01738067c4..fab2d49baab 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include @@ -19,11 +19,6 @@ namespace DB { -namespace ErrorCodes -{ -extern const int LOGICAL_ERROR; -} - StorageFileCluster::StorageFileCluster( const ContextPtr & context, const String & cluster_name_, @@ -66,12 +61,14 @@ StorageFileCluster::StorageFileCluster( void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const DB::ContextPtr & context) { - ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); - if (!expression_list) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function fileCluster, got '{}'", queryToString(query)); + auto * table_function = extractTableFunctionFromSelectQuery(query); TableFunctionFileCluster::updateStructureAndFormatArgumentsIfNeeded( - expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); + table_function, + storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), + format_name, + context + ); } RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 04fd5fb9675..1742c0a1c21 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include @@ -30,16 +30,17 @@ namespace DB { -namespace Setting -{ - extern const SettingsUInt64 glob_expansion_max_elements; -} namespace ErrorCodes { extern const int LOGICAL_ERROR; } +namespace Setting +{ + extern const SettingsUInt64 glob_expansion_max_elements; +} + StorageURLCluster::StorageURLCluster( const ContextPtr & context, const String & cluster_name_, @@ -86,12 +87,20 @@ StorageURLCluster::StorageURLCluster( void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { - ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); + auto * table_function = extractTableFunctionFromSelectQuery(query); + if (!table_function) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query)); + + auto * expression_list = table_function->arguments->as(); if (!expression_list) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query)); TableFunctionURLCluster::updateStructureAndFormatArgumentsIfNeeded( - expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); + table_function, + storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), + format_name, + context + ); } RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp similarity index 65% rename from src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp rename to src/Storages/extractTableFunctionFromSelectQuery.cpp index 382964d9fe1..2040cd4b3ad 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -1,7 +1,6 @@ -#include +#include #include -#include #include #include #include @@ -11,7 +10,7 @@ namespace DB { -ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -22,8 +21,7 @@ ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) if (!table_expression->table_function) return nullptr; - auto * table_function = table_expression->table_function->as(); - return table_function->arguments->as(); + return table_expression->table_function->as(); } } diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h similarity index 50% rename from src/Storages/extractTableFunctionArgumentsFromSelectQuery.h rename to src/Storages/extractTableFunctionFromSelectQuery.h index af19ef656cc..c69cc7ce6c5 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -2,10 +2,11 @@ #include #include +#include namespace DB { -ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); } diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index 6935ac39e79..c935c586243 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -24,15 +25,25 @@ class ITableFunctionCluster : public Base public: String getName() const override = 0; - static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context) + static void updateStructureAndFormatArgumentsIfNeeded(ASTFunction * table_function, const String & structure_, const String & format_, const ContextPtr & context) { + auto * expression_list = table_function->arguments->as(); + ASTs args = expression_list->children; + if (args.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name); - ASTPtr cluster_name_arg = args.front(); - args.erase(args.begin()); - Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); - args.insert(args.begin(), cluster_name_arg); + if (table_function->name == Base::name) + Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); + else if (table_function->name == fmt::format("{}Cluster", Base::name)) + { + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); + args.insert(args.begin(), cluster_name_arg); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected table function name: {}", table_function->name); } protected: diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 1ed803ae5ce..6308084e8e8 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -1,5 +1,8 @@ #include "config.h" +#include +#include + #include #include #include @@ -19,11 +22,20 @@ #include #include #include +#include namespace DB { +namespace Setting +{ + extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas; + extern const SettingsBool parallel_replicas_for_cluster_engines; + extern const SettingsString cluster_for_parallel_replicas; + extern const SettingsParallelReplicasMode parallel_replicas_mode; +} + namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; @@ -100,8 +112,9 @@ StoragePtr TableFunctionObjectStorage::executeImpl( ColumnsDescription cached_columns, bool is_insert_query) const { - ColumnsDescription columns; chassert(configuration); + ColumnsDescription columns; + if (configuration->structure != "auto") columns = parseColumnsListFromString(configuration->structure, context); else if (!structure_hint.empty()) @@ -109,18 +122,43 @@ StoragePtr TableFunctionObjectStorage::executeImpl( else if (!cached_columns.empty()) columns = cached_columns; - StoragePtr storage = std::make_shared( + StoragePtr storage; + const auto & settings = context->getSettingsRef(); + + auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); + auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 + && settings[Setting::parallel_replicas_for_cluster_engines] + && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS + && !parallel_replicas_cluster_name.empty() + && !context->isDistributed(); + + if (can_use_parallel_replicas) + { + storage = std::make_shared( + parallel_replicas_cluster_name, + configuration, + getObjectStorage(context, !is_insert_query), + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + context); + + storage->startup(); + return storage; + } + + storage = std::make_shared( configuration, getObjectStorage(context, !is_insert_query), context, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, - String{}, + /* comment */ String{}, /* format_settings */ std::nullopt, /* mode */ LoadingStrictnessLevel::CREATE, /* distributed_processing */ false, - nullptr); + /* partition_by */ nullptr); storage->startup(); return storage; diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 19cd637bd80..9c78342f611 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -6,8 +6,10 @@ #include #include #include + #include "config.h" + namespace DB { diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index 8f4841a992b..bedb4955c94 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -2,22 +2,34 @@ #include "registerTableFunctions.h" #include +#include +#include +#include +#include #include +#include #include #include #include #include +#include #include -#include -#include -#include -#include #include #include + + namespace DB { +namespace Setting +{ + extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas; + extern const SettingsBool parallel_replicas_for_cluster_engines; + extern const SettingsString cluster_for_parallel_replicas; + extern const SettingsParallelReplicasMode parallel_replicas_mode; +} + std::vector TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const { auto & table_function_node = query_node_table_function->as(); @@ -120,6 +132,28 @@ StoragePtr TableFunctionURL::getStorage( const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, const std::string & table_name, const String & compression_method_) const { + const auto & settings = global_context->getSettingsRef(); + auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); + auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0 + && settings[Setting::parallel_replicas_for_cluster_engines] + && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS + && !parallel_replicas_cluster_name.empty() + && !global_context->isDistributed(); + + if (can_use_parallel_replicas) + { + return std::make_shared( + global_context, + settings[Setting::cluster_for_parallel_replicas], + filename, + format, + compression_method, + StorageID(getDatabaseName(), table_name), + getActualTableStructure(global_context, /* is_insert_query */ true), + ConstraintsDescription{}, + configuration); + } + return std::make_shared( source, StorageID(getDatabaseName(), table_name), diff --git a/src/TableFunctions/TableFunctionURLCluster.cpp b/src/TableFunctions/TableFunctionURLCluster.cpp index 5fd3c3342a5..f70b7f5c882 100644 --- a/src/TableFunctions/TableFunctionURLCluster.cpp +++ b/src/TableFunctions/TableFunctionURLCluster.cpp @@ -10,11 +10,10 @@ StoragePtr TableFunctionURLCluster::getStorage( const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context, const std::string & table_name, const String & /*compression_method_*/) const { - StoragePtr storage; if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { //On worker node this uri won't contain globs - storage = std::make_shared( + return std::make_shared( filename, StorageID(getDatabaseName(), table_name), format, @@ -29,20 +28,17 @@ StoragePtr TableFunctionURLCluster::getStorage( nullptr, /*distributed_processing=*/ true); } - else - { - storage = std::make_shared( - context, - cluster_name, - filename, - format, - compression_method, - StorageID(getDatabaseName(), table_name), - getActualTableStructure(context, /* is_insert_query */ true), - ConstraintsDescription{}, - configuration); - } - return storage; + + return std::make_shared( + context, + cluster_name, + filename, + format, + compression_method, + StorageID(getDatabaseName(), table_name), + getActualTableStructure(context, /* is_insert_query */ true), + ConstraintsDescription{}, + configuration); } void registerTableFunctionURLCluster(TableFunctionFactory & factory) diff --git a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference new file mode 100644 index 00000000000..7859daeb71f --- /dev/null +++ b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.reference @@ -0,0 +1,8 @@ +Expression (Project names) + ReadFromCluster +Expression (Project names) + ReadFromCluster +Expression ((Project names + (Projection + Change column names to column identifiers))) + ReadFromURL +Expression ((Project names + (Projection + Change column names to column identifiers))) + S3Source diff --git a/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql new file mode 100644 index 00000000000..ee0d355d849 --- /dev/null +++ b/tests/queries/0_stateless/03275_auto_cluster_functions_with_parallel_replicas.sql @@ -0,0 +1,14 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: Depends on AWS + +SET enable_parallel_replicas=1; +SET cluster_for_parallel_replicas='default'; +SET parallel_replicas_for_cluster_engines=true; + +EXPLAIN SELECT * FROM url('http://localhost:8123'); +EXPLAIN SELECT * FROM s3('http://localhost:11111/test/a.tsv', 'TSV'); + +SET parallel_replicas_for_cluster_engines=false; + +EXPLAIN SELECT * FROM url('http://localhost:8123'); +EXPLAIN SELECT * FROM s3('http://localhost:11111/test/a.tsv', 'TSV');