diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 0c4ed6482dc..2ae90ff7a31 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -318,7 +318,8 @@ StorageS3::StorageS3( const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, ContextPtr context_, - const String & compression_method_) + const String & compression_method_, + bool distributed_processing_) : IStorage(table_id_) , client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later , format_name(format_name_) @@ -326,6 +327,7 @@ StorageS3::StorageS3( , max_single_part_upload_size(max_single_part_upload_size_) , compression_method(compression_method_) , name(uri_.storage_name) + , distributed_processing(distributed_processing_) { context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri); StorageInMemoryMetadata storage_metadata; @@ -358,13 +360,24 @@ Pipe StorageS3::read( need_file_column = true; } - /// Iterate through disclosed globs and make a source for each file - auto glob_iterator = std::make_shared(*client_auth.client, client_auth.uri); - auto iterator_wrapper = std::make_shared([glob_iterator]() + std::shared_ptr iterator_wrapper{nullptr}; + if (distributed_processing) { - return glob_iterator->next(); - }); - + iterator_wrapper = std::make_shared( + [callback = local_context->getReadTaskCallback()]() -> String { + return callback(); + }); + } + else + { + /// Iterate through disclosed globs and make a source for each file + auto glob_iterator = std::make_shared(*client_auth.client, client_auth.uri); + iterator_wrapper = std::make_shared([glob_iterator]() + { + return glob_iterator->next(); + }); + } + for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 512074479e5..1e1d76fa6e3 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -106,7 +106,8 @@ public: const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, ContextPtr context_, - const String & compression_method_ = ""); + const String & compression_method_ = "", + bool distributed_processing_ = false); String getName() const override { @@ -130,7 +131,6 @@ private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; - friend class StorageS3SequentialSource; struct ClientAuthentificaiton { @@ -149,6 +149,7 @@ private: size_t max_single_part_upload_size; String compression_method; String name; + const bool distributed_processing; static void updateClientAndAuthSettings(ContextPtr, ClientAuthentificaiton &); }; diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index e8b73bc2acb..8afc0e44023 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -79,58 +79,18 @@ StorageS3Cluster::StorageS3Cluster( StorageS3::updateClientAndAuthSettings(context_, client_auth); } - +/// The code executes on initiator Pipe StorageS3Cluster::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - unsigned num_streams) + size_t /*max_block_size*/, + unsigned /*num_streams*/) { StorageS3::updateClientAndAuthSettings(context, client_auth); - /// Secondary query, need to read from S3 - if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) - { - bool need_path_column = false; - bool need_file_column = false; - for (const auto & column : column_names) - { - if (column == "_path") - need_path_column = true; - if (column == "_file") - need_file_column = true; - } - - /// Save callback not to capture context by reference of copy it. - auto file_iterator = std::make_shared( - [callback = context->getReadTaskCallback()]() -> String { - return callback(); - }); - - Pipes pipes; - for (size_t i = 0; i < num_streams; ++i) - { - pipes.emplace_back(std::make_shared( - need_path_column, need_file_column, format_name, getName(), - metadata_snapshot->getSampleBlock(), context, - metadata_snapshot->getColumns(), max_block_size, - compression_method, - client_auth.client, - client_auth.uri.bucket, - file_iterator - )); - } - auto pipe = Pipe::unitePipes(std::move(pipes)); - - narrowPipe(pipe, num_streams); - return pipe; - } - - /// The code from here and below executes on initiator - auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettings()); S3::URI s3_uri(Poco::URI{filename}); StorageS3::updateClientAndAuthSettings(context, client_auth); diff --git a/src/TableFunctions/TableFunctionS3Cluster.cpp b/src/TableFunctions/TableFunctionS3Cluster.cpp index 30526057ca5..4a48ed83135 100644 --- a/src/TableFunctions/TableFunctionS3Cluster.cpp +++ b/src/TableFunctions/TableFunctionS3Cluster.cpp @@ -102,11 +102,29 @@ StoragePtr TableFunctionS3Cluster::executeImpl( const ASTPtr & /*function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const { - StoragePtr storage = StorageS3Cluster::create( - filename, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name), - cluster_name, format, context->getSettingsRef().s3_max_connections, - getActualTableStructure(context), ConstraintsDescription{}, - context, compression_method); + StoragePtr storage; + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + /// On worker node this filename won't contains globs + Poco::URI uri (filename); + S3::URI s3_uri (uri); + /// Actually this parameters are not used + UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size; + UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size; + UInt64 max_connections = context->getSettingsRef().s3_max_connections; + storage = StorageS3::create( + s3_uri, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name), + format, min_upload_part_size, max_single_part_upload_size, max_connections, + getActualTableStructure(context), ConstraintsDescription{}, + context, compression_method, /*distributed_processing=*/true); + } + else { + storage = StorageS3Cluster::create( + filename, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name), + cluster_name, format, context->getSettingsRef().s3_max_connections, + getActualTableStructure(context), ConstraintsDescription{}, + context, compression_method); + } storage->startup();