From 2555ae5d3f7f88b46cc5611c149136ed8357d31d Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 7 Apr 2021 17:43:34 +0300 Subject: [PATCH] better processing stage --- src/Client/Connection.h | 2 + src/Storages/StorageS3Distributed.cpp | 74 ++++++++++++--------------- src/Storages/StorageS3Distributed.h | 5 +- 3 files changed, 36 insertions(+), 45 deletions(-) diff --git a/src/Client/Connection.h b/src/Client/Connection.h index 7c21a282ce1..502cf8ad9e8 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -65,6 +65,8 @@ struct Packet Progress progress; BlockStreamProfileInfo profile_info; std::vector part_uuids; + /// String describes an identifier for a request. + /// Used for dynamic distributed data processing (S3 downloading) String read_task_request; Packet() : type(Protocol::Server::Hello) {} diff --git a/src/Storages/StorageS3Distributed.cpp b/src/Storages/StorageS3Distributed.cpp index 2a257ed922e..604750e4e9a 100644 --- a/src/Storages/StorageS3Distributed.cpp +++ b/src/Storages/StorageS3Distributed.cpp @@ -90,8 +90,8 @@ public: const StorageS3SourceBuilder & s3_builder_) : SourceWithProgress(getHeader(s3_builder_.sample_block, s3_builder_.need_path, s3_builder_.need_file)) , initial_query_id(initial_query_id_) - , s3_source_builder(s3_builder_) - , cli_builder(client_auth_builder_) + , s3b(s3_builder_) + , cab(client_auth_builder_) , read_task_callback(read_task_callback_) { createOrUpdateInnerSource(); @@ -128,7 +128,7 @@ private: } catch (...) { - tryLogCurrentException(&Poco::Logger::get("StorageS3SequentialSource")); + tryLogCurrentException(&Poco::Logger::get(getName())); throw; } } @@ -143,21 +143,15 @@ private: auto client_auth = StorageS3::ClientAuthentificaiton{ next_uri, - cli_builder.access_key_id, - cli_builder.secret_access_key, - cli_builder.max_connections, + cab.access_key_id, + cab.secret_access_key, + cab.max_connections, {}, {}}; - StorageS3::updateClientAndAuthSettings(s3_source_builder.context, client_auth); + StorageS3::updateClientAndAuthSettings(s3b.context, client_auth); inner = std::make_unique( - s3_source_builder.need_path, - s3_source_builder.need_file, - s3_source_builder.format, - s3_source_builder.name, - s3_source_builder.sample_block, - s3_source_builder.context, - s3_source_builder.columns, - s3_source_builder.max_block_size, + s3b.need_path, s3b.need_file, s3b.format, s3b.name, + s3b.sample_block, s3b.context, s3b.columns, s3b.max_block_size, chooseCompressionMethod(client_auth.uri.key, ""), client_auth.client, client_auth.uri.bucket, @@ -167,14 +161,10 @@ private: return true; } - /// This is used to ask about next task String initial_query_id; - - StorageS3SourceBuilder s3_source_builder; - ClientAuthentificationBuilder cli_builder; - + StorageS3SourceBuilder s3b; + ClientAuthentificationBuilder cab; std::unique_ptr inner; - NextTaskCallback read_task_callback; }; @@ -229,28 +219,18 @@ Pipe StorageS3Distributed::read( need_file_column = true; } - StorageS3SourceBuilder s3builder - { - need_path_column, - need_file_column, - format_name, - getName(), - metadata_snapshot->getSampleBlock(), - context, - metadata_snapshot->getColumns(), - max_block_size, + StorageS3SourceBuilder s3builder{ + need_path_column, need_file_column, + format_name, getName(), + metadata_snapshot->getSampleBlock(), context, + metadata_snapshot->getColumns(), max_block_size, compression_method }; return Pipe(std::make_shared( - context.getInitialQueryId(), - context.getNextTaskCallback(), - cli_builder, - s3builder - )); + context.getInitialQueryId(), context.getNextTaskCallback(), cli_builder, s3builder)); } - /// The code from here and below executes on initiator /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -269,21 +249,33 @@ Pipe StorageS3Distributed::read( connections.emplace_back(std::make_shared( node.host_name, node.port, context.getGlobalContext().getCurrentDatabase(), node.user, node.password, node.cluster, node.cluster_secret, - "StorageS3DistributedInititiator", - Protocol::Compression::Disable, - Protocol::Secure::Disable + "S3DistributedInititiator", + node.compression, + node.secure )); auto remote_query_executor = std::make_shared( *connections.back(), queryToString(query_info.query), header, context, /*throttler=*/nullptr, scalars, Tables(), processed_stage); - pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, false, false, false, false)); + pipes.emplace_back(std::make_shared(remote_query_executor, false, false)); } } metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); return Pipe::unitePipes(std::move(pipes)); } + +QueryProcessingStage::Enum StorageS3Distributed::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const +{ + /// Initiator executes query on remote node. + if (context.getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) { + return QueryProcessingStage::Enum::WithMergeableState; + } + /// Follower just reads the data. + return QueryProcessingStage::Enum::FetchColumns; +} + + } #endif diff --git a/src/Storages/StorageS3Distributed.h b/src/Storages/StorageS3Distributed.h index 13e28d1a7aa..9bfb792766d 100644 --- a/src/Storages/StorageS3Distributed.h +++ b/src/Storages/StorageS3Distributed.h @@ -46,10 +46,7 @@ public: size_t /*max_block_size*/, unsigned /*num_streams*/) override; - QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override - { - return QueryProcessingStage::Enum::WithMergeableState; - } + QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override; NamesAndTypesList getVirtuals() const override;