diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 907edf8cc18..41459404d08 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5643,6 +5643,9 @@ Parts virtually divided into segments to be distributed between replicas for par DECLARE(Bool, parallel_replicas_local_plan, true, R"( Build local plan for local replica )", BETA) \ + DECLARE(Bool, parallel_replicas_for_cluster_engines, true, R"( +Replace table function engines with their -Cluster alternatives +)", EXPERIMENTAL) \ \ DECLARE(Bool, allow_experimental_analyzer, true, R"( Allow new query analyzer. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 64913f03754..58a5e105111 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -60,6 +60,7 @@ static std::initializer_listchildren, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); + table_function, + storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), + format_name, + context + ); } RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 04fd5fb9675..8c288f6931a 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -86,12 +86,14 @@ StorageURLCluster::StorageURLCluster( void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { - ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); - if (!expression_list) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query)); + auto * table_function = extractTableFunctionFromSelectQuery(query); TableFunctionURLCluster::updateStructureAndFormatArgumentsIfNeeded( - expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); + table_function, + storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), + format_name, + context + ); } RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp index 382964d9fe1..dd4f05d1f56 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp @@ -1,7 +1,6 @@ #include #include -#include #include #include #include @@ -26,4 +25,18 @@ ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) return table_function->arguments->as(); } +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto * select_query = query->as(); + if (!select_query || !select_query->tables()) + return nullptr; + + auto * tables = select_query->tables()->as(); + auto * table_expression = tables->children[0]->as()->table_expression->as(); + if (!table_expression->table_function) + return nullptr; + + return table_expression->table_function->as(); +} + } diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h index af19ef656cc..39c19ebb28e 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h +++ b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h @@ -2,10 +2,12 @@ #include #include +#include namespace DB { ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); } diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index 6935ac39e79..a0c715e298e 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -2,10 +2,13 @@ #include #include +#include #include #include #include +#include + namespace DB { @@ -24,15 +27,23 @@ class ITableFunctionCluster : public Base public: String getName() const override = 0; - static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context) + static void updateStructureAndFormatArgumentsIfNeeded(ASTFunction * table_function, const String & structure_, const String & format_, const ContextPtr & context) { + auto * expression_list = table_function->arguments->as(); + ASTs args = expression_list->children; + if (args.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name); - ASTPtr cluster_name_arg = args.front(); - args.erase(args.begin()); - Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); - args.insert(args.begin(), cluster_name_arg); + if (table_function-> name == Base::name) + Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); + else + { + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); + args.insert(args.begin(), cluster_name_arg); + } } protected: diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 12de08afad0..2f333b53669 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -1,5 +1,8 @@ #include "config.h" +#include +#include + #include #include #include @@ -19,11 +22,20 @@ #include #include #include +#include namespace DB { +namespace Setting +{ + extern const SettingsBool use_parallel_replicas; + extern const SettingsBool parallel_replicas_for_cluster_engines; + extern const SettingsString cluster_for_parallel_replicas; + extern const SettingsParallelReplicasMode parallel_replicas_mode; +} + namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; @@ -100,8 +112,9 @@ StoragePtr TableFunctionObjectStorage::executeImpl( ColumnsDescription cached_columns, bool is_insert_query) const { - ColumnsDescription columns; chassert(configuration); + ColumnsDescription columns; + if (configuration->structure != "auto") columns = parseColumnsListFromString(configuration->structure, context); else if (!structure_hint.empty()) @@ -109,18 +122,61 @@ StoragePtr TableFunctionObjectStorage::executeImpl( else if (!cached_columns.empty()) columns = cached_columns; - StoragePtr storage = std::make_shared( + StoragePtr storage; + + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + storage = std::make_shared( + configuration, + getObjectStorage(context, !is_insert_query), + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + /* comment */ String{}, + /* format_settings */ std::nullopt, + /* mode */ LoadingStrictnessLevel::CREATE, + /* distributed_processing */ true, + /* partition_by */ nullptr); + + storage->startup(); + return storage; + } + + const auto & settings = context->getSettingsRef(); + auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); + auto can_use_parallel_replicas = settings[Setting::use_parallel_replicas] + && settings[Setting::parallel_replicas_for_cluster_engines] + && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS + && !parallel_replicas_cluster_name.empty(); + + if (can_use_parallel_replicas) + { + storage = std::make_shared( + parallel_replicas_cluster_name, + configuration, + getObjectStorage(context, !is_insert_query), + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + context); + + storage->startup(); + return storage; + } + + storage = std::make_shared( configuration, getObjectStorage(context, !is_insert_query), context, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, - String{}, + /* comment */ String{}, /* format_settings */ std::nullopt, /* mode */ LoadingStrictnessLevel::CREATE, /* distributed_processing */ false, - nullptr); + /* partition_by */ nullptr); storage->startup(); return storage; diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 19cd637bd80..9c78342f611 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -6,8 +6,10 @@ #include #include #include + #include "config.h" + namespace DB { diff --git a/src/TableFunctions/TableFunctionURL.cpp b/src/TableFunctions/TableFunctionURL.cpp index 8f4841a992b..08c2aec30c3 100644 --- a/src/TableFunctions/TableFunctionURL.cpp +++ b/src/TableFunctions/TableFunctionURL.cpp @@ -2,22 +2,34 @@ #include "registerTableFunctions.h" #include +#include +#include +#include +#include #include +#include #include #include #include #include +#include #include -#include -#include -#include -#include #include #include + + namespace DB { +namespace Setting +{ + extern const SettingsBool use_parallel_replicas; + extern const SettingsBool parallel_replicas_for_cluster_engines; + extern const SettingsString cluster_for_parallel_replicas; + extern const SettingsParallelReplicasMode parallel_replicas_mode; +} + std::vector TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const { auto & table_function_node = query_node_table_function->as(); @@ -120,6 +132,46 @@ StoragePtr TableFunctionURL::getStorage( const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, const std::string & table_name, const String & compression_method_) const { + if (global_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + return std::make_shared( + source, + StorageID(getDatabaseName(), table_name), + format_, + std::nullopt /*format settings*/, + columns, + ConstraintsDescription{}, + String{}, + global_context, + compression_method_, + configuration.headers, + configuration.http_method, + nullptr, + /*distributed_processing*/ true); + } + + const auto & settings = global_context->getSettingsRef(); + auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString(); + auto can_use_parallel_replicas = settings[Setting::use_parallel_replicas] + && settings[Setting::parallel_replicas_for_cluster_engines] + && settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS + && !parallel_replicas_cluster_name.empty(); + + if (can_use_parallel_replicas) + { + LOG_DEBUG(&Poco::Logger::get("TableFunctionURL"), "TableFunctionURL::getStorage wrapped to cluster version"); + return std::make_shared( + global_context, + settings[Setting::cluster_for_parallel_replicas], + filename, + format, + compression_method, + StorageID(getDatabaseName(), table_name), + getActualTableStructure(global_context, /* is_insert_query */ true), + ConstraintsDescription{}, + configuration); + } + return std::make_shared( source, StorageID(getDatabaseName(), table_name), diff --git a/src/TableFunctions/TableFunctionURLCluster.cpp b/src/TableFunctions/TableFunctionURLCluster.cpp index 5fd3c3342a5..f70b7f5c882 100644 --- a/src/TableFunctions/TableFunctionURLCluster.cpp +++ b/src/TableFunctions/TableFunctionURLCluster.cpp @@ -10,11 +10,10 @@ StoragePtr TableFunctionURLCluster::getStorage( const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context, const std::string & table_name, const String & /*compression_method_*/) const { - StoragePtr storage; if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { //On worker node this uri won't contain globs - storage = std::make_shared( + return std::make_shared( filename, StorageID(getDatabaseName(), table_name), format, @@ -29,20 +28,17 @@ StoragePtr TableFunctionURLCluster::getStorage( nullptr, /*distributed_processing=*/ true); } - else - { - storage = std::make_shared( - context, - cluster_name, - filename, - format, - compression_method, - StorageID(getDatabaseName(), table_name), - getActualTableStructure(context, /* is_insert_query */ true), - ConstraintsDescription{}, - configuration); - } - return storage; + + return std::make_shared( + context, + cluster_name, + filename, + format, + compression_method, + StorageID(getDatabaseName(), table_name), + getActualTableStructure(context, /* is_insert_query */ true), + ConstraintsDescription{}, + configuration); } void registerTableFunctionURLCluster(TableFunctionFactory & factory)