diff --git a/src/Interpreters/getTableExpressions.cpp b/src/Interpreters/getTableExpressions.cpp index 830f0ea4411..43c9438ecec 100644 --- a/src/Interpreters/getTableExpressions.cpp +++ b/src/Interpreters/getTableExpressions.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include namespace DB @@ -155,4 +156,20 @@ TablesWithColumns getDatabaseAndTablesWithColumns( return tables_with_columns; } +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) +{ + auto * select_query = query->as(); + if (!select_query || !select_query->tables()) + return nullptr; + + auto * tables = select_query->tables()->as(); + auto * table_expression = tables->children[0]->as()->table_expression->as(); + if (!table_expression->table_function) + return nullptr; + + auto * table_function = table_expression->table_function->as(); + return table_function->arguments->as(); +} + + } diff --git a/src/Interpreters/getTableExpressions.h b/src/Interpreters/getTableExpressions.h index c4ca01ee3c3..4497b376cc7 100644 --- a/src/Interpreters/getTableExpressions.h +++ b/src/Interpreters/getTableExpressions.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { @@ -23,4 +24,6 @@ ASTPtr extractTableExpression(const ASTSelectQuery & select, size_t table_number TablesWithColumns getDatabaseAndTablesWithColumns( const ASTTableExprConstPtrs & table_expressions, ContextPtr context, bool include_alias_cols, bool include_materialized_cols); +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); + } diff --git a/src/Storages/HDFS/StorageHDFSCluster.cpp b/src/Storages/HDFS/StorageHDFSCluster.cpp index 47a6fbf5eaa..7851532a6a4 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.cpp +++ b/src/Storages/HDFS/StorageHDFSCluster.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include @@ -32,6 +33,34 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +static ASTPtr addColumnsStructureToQuery(const ASTPtr & query, const String & structure) +{ + /// Add argument with table structure to hdfsCluster table function in select query. + auto result_query = query->clone(); + ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(result_query); + if (!expression_list) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function hdfsCluster, got '{}'", queryToString(query)); + + auto structure_literal = std::make_shared(structure); + + if (expression_list->children.size() != 2 && expression_list->children.size() != 3) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 or 3 arguments in hdfsCluster table functions, got {}", expression_list->children.size()); + + if (expression_list->children.size() == 2) + { + auto format_literal = std::make_shared("auto"); + expression_list->children.push_back(format_literal); + } + + expression_list->children.push_back(structure_literal); + return result_query; +} + StorageHDFSCluster::StorageHDFSCluster( ContextPtr context_, String cluster_name_, @@ -56,6 +85,7 @@ StorageHDFSCluster::StorageHDFSCluster( { auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_); storage_metadata.setColumns(columns); + need_to_add_structure_to_query = true; } else storage_metadata.setColumns(columns_); @@ -92,6 +122,11 @@ Pipe StorageHDFSCluster::read( const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; + auto query_to_send = query_info.original_query; + if (need_to_add_structure_to_query) + query_to_send = addColumnsStructureToQuery( + query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll())); + for (const auto & replicas : cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard @@ -110,7 +145,7 @@ Pipe StorageHDFSCluster::read( /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( connection, - queryToString(query_info.original_query), + queryToString(query_to_send), header, context, /*throttler=*/nullptr, diff --git a/src/Storages/HDFS/StorageHDFSCluster.h b/src/Storages/HDFS/StorageHDFSCluster.h index 21ae73c11ea..d5af257f803 100644 --- a/src/Storages/HDFS/StorageHDFSCluster.h +++ b/src/Storages/HDFS/StorageHDFSCluster.h @@ -44,6 +44,7 @@ private: String uri; String format_name; String compression_method; + bool need_to_add_structure_to_query = false; }; diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index e15956f78be..8a6349007ec 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -361,39 +361,6 @@ String StorageS3Source::KeysIterator::next() return pimpl->next(); } -class StorageS3Source::ReadTasksIterator::Impl -{ -public: - explicit Impl(const std::vector & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_) - : read_tasks(read_tasks_), new_read_tasks_callback(new_read_tasks_callback_) - { - } - - String next() - { - size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - if (current_index >= read_tasks.size()) - return new_read_tasks_callback(); - return read_tasks[current_index]; - } - -private: - std::atomic_size_t index = 0; - std::vector read_tasks; - ReadTaskCallback new_read_tasks_callback; -}; - -StorageS3Source::ReadTasksIterator::ReadTasksIterator( - const std::vector & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_) - : pimpl(std::make_shared(read_tasks_, new_read_tasks_callback_)) -{ -} - -String StorageS3Source::ReadTasksIterator::next() -{ - return pimpl->next(); -} - Block StorageS3Source::getHeader(Block sample_block, const std::vector & requested_virtual_columns) { for (const auto & virtual_column : requested_virtual_columns) @@ -802,8 +769,7 @@ StorageS3::StorageS3( distributed_processing_, is_key_with_globs, format_settings, - context_, - &read_tasks_used_in_schema_inference); + context_); storage_metadata.setColumns(columns); } else @@ -831,19 +797,14 @@ std::shared_ptr StorageS3::createFileIterator( ContextPtr local_context, ASTPtr query, const Block & virtual_block, - const std::vector & read_tasks, std::unordered_map * object_infos, Strings * read_keys) { if (distributed_processing) { return std::make_shared( - [read_tasks_iterator = std::make_shared(read_tasks, local_context->getReadTaskCallback()), read_keys]() -> String - { - auto key = read_tasks_iterator->next(); - if (read_keys) - read_keys->push_back(key); - return key; + [callback = local_context->getReadTaskCallback()]() -> String { + return callback(); }); } else if (is_key_with_globs) @@ -903,7 +864,6 @@ Pipe StorageS3::read( local_context, query_info.query, virtual_block, - read_tasks_used_in_schema_inference, &object_infos); ColumnsDescription columns_description; @@ -1201,7 +1161,7 @@ ColumnsDescription StorageS3::getTableStructureFromData( return getTableStructureFromDataImpl( configuration.format, s3_configuration, configuration.compression_method, distributed_processing, - s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos); + s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, object_infos); } ColumnsDescription StorageS3::getTableStructureFromDataImpl( @@ -1212,13 +1172,12 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx, - std::vector * read_keys_in_distributed_processing, std::unordered_map * object_infos) { std::vector read_keys; auto file_iterator - = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, {}, object_infos, &read_keys); + = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, object_infos, &read_keys); std::optional columns_from_cache; size_t prev_read_keys_size = read_keys.size(); @@ -1271,9 +1230,6 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( if (ctx->getSettingsRef().schema_inference_use_cache_for_s3) addColumnsToCache(read_keys, s3_configuration, columns, format, format_settings, ctx); - if (distributed_processing && read_keys_in_distributed_processing) - *read_keys_in_distributed_processing = std::move(read_keys); - return columns; } diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index c63508c8e6a..7c6970bab0e 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -66,18 +66,6 @@ public: std::shared_ptr pimpl; }; - class ReadTasksIterator - { - public: - ReadTasksIterator(const std::vector & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_); - String next(); - - private: - class Impl; - /// shared_ptr to have copy constructor - std::shared_ptr pimpl; - }; - using IteratorWrapper = std::function; static Block getHeader(Block sample_block, const std::vector & requested_virtual_columns); @@ -238,8 +226,6 @@ private: ASTPtr partition_by; bool is_key_with_globs = false; - std::vector read_tasks_used_in_schema_inference; - std::unordered_map object_infos; static void updateS3Configuration(ContextPtr, S3Configuration &); @@ -252,7 +238,6 @@ private: ContextPtr local_context, ASTPtr query, const Block & virtual_block, - const std::vector & read_tasks = {}, std::unordered_map * object_infos = nullptr, Strings * read_keys = nullptr); @@ -264,7 +249,6 @@ private: bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx, - std::vector * read_keys_in_distributed_processing = nullptr, std::unordered_map * object_infos = nullptr); bool supportsSubsetOfColumns() const override; diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 800bce0afde..245888f5ce3 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -5,14 +5,9 @@ #if USE_AWS_S3 #include "Common/Exception.h" -#include #include "Client/Connection.h" #include "Core/QueryProcessingStage.h" -#include -#include -#include #include -#include #include #include #include @@ -23,28 +18,54 @@ #include #include #include -#include "Processors/ISource.h" #include #include #include #include +#include #include #include #include +#include #include #include #include #include -#include #include #include -#include -#include namespace DB { + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +static ASTPtr addColumnsStructureToQuery(const ASTPtr & query, const String & structure) +{ + /// Add argument with table structure to s3Cluster table function in select query. + auto result_query = query->clone(); + ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(result_query); + if (!expression_list) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function from s3Cluster, got '{}'", queryToString(query)); + auto structure_literal = std::make_shared(structure); + + if (expression_list->children.size() < 2 || expression_list->children.size() > 5) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 to 5 arguments in s3Cluster table functions, got {}", expression_list->children.size()); + + if (expression_list->children.size() == 2 || expression_list->children.size() == 4) + { + auto format_literal = std::make_shared("auto"); + expression_list->children.push_back(format_literal); + } + + expression_list->children.push_back(structure_literal); + return result_query; +} + StorageS3Cluster::StorageS3Cluster( const StorageS3ClusterConfiguration & configuration_, const StorageID & table_id_, @@ -72,6 +93,7 @@ StorageS3Cluster::StorageS3Cluster( auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method, /*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_); storage_metadata.setColumns(columns); + need_to_add_structure_to_query = true; } else storage_metadata.setColumns(columns_); @@ -117,6 +139,11 @@ Pipe StorageS3Cluster::read( const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; + ASTPtr query_to_send = query_info.original_query; + if (need_to_add_structure_to_query) + query_to_send = addColumnsStructureToQuery( + query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll())); + for (const auto & replicas : cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard @@ -135,7 +162,7 @@ Pipe StorageS3Cluster::read( /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( connection, - queryToString(query_info.original_query), + queryToString(query_to_send), header, context, /*throttler=*/nullptr, diff --git a/src/Storages/StorageS3Cluster.h b/src/Storages/StorageS3Cluster.h index e5ca3b58123..7e4d4801090 100644 --- a/src/Storages/StorageS3Cluster.h +++ b/src/Storages/StorageS3Cluster.h @@ -46,6 +46,7 @@ private: String compression_method; NamesAndTypesList virtual_columns; Block virtual_block; + bool need_to_add_structure_to_query = false; }; diff --git a/src/TableFunctions/TableFunctionHDFSCluster.cpp b/src/TableFunctions/TableFunctionHDFSCluster.cpp index 385d280a100..fd2ef7bb052 100644 --- a/src/TableFunctions/TableFunctionHDFSCluster.cpp +++ b/src/TableFunctions/TableFunctionHDFSCluster.cpp @@ -48,7 +48,7 @@ void TableFunctionHDFSCluster::parseArguments(const ASTPtr & ast_function, Conte const auto message = fmt::format( "The signature of table function {} shall be the following:\n" \ " - cluster, uri\n",\ - " - cluster, format\n",\ + " - cluster, uri, format\n",\ " - cluster, uri, format, structure\n",\ " - cluster, uri, format, structure, compression_method", getName()); diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index 0bf33007760..0ceb7cd5e5b 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -64,7 +64,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar if (args.size() == 4) { auto second_arg = checkAndGetLiteralArgument(args[1], "format/access_key_id"); - if (FormatFactory::instance().getAllFormats().contains(second_arg)) + if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}}; else @@ -77,7 +77,7 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar { auto second_arg = checkAndGetLiteralArgument(args[1], "format/access_key_id"); - if (FormatFactory::instance().getAllFormats().contains(second_arg)) + if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg)) args_to_idx = {{"format", 1}, {"structure", 2}}; else args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}}; diff --git a/src/TableFunctions/TableFunctionS3Cluster.cpp b/src/TableFunctions/TableFunctionS3Cluster.cpp index 99c3ff85009..668cd67dd08 100644 --- a/src/TableFunctions/TableFunctionS3Cluster.cpp +++ b/src/TableFunctions/TableFunctionS3Cluster.cpp @@ -77,6 +77,7 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context /// StorageS3ClusterConfiguration inherints from StorageS3Configuration, so it is safe to upcast it. TableFunctionS3::parseArgumentsImpl(message, clipped_args, context, static_cast(configuration)); + LOG_DEBUG(&Poco::Logger::get("TableFunctionS3Cluster"), "Structure: {}", configuration.structure); }