From fea3aaafe6f6461cc5a0800d8438dff583e96f3a Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Tue, 6 Apr 2021 14:51:16 +0300 Subject: [PATCH] simplify storages3dist --- src/Storages/StorageS3Distributed.cpp | 62 +-------------------------- 1 file changed, 2 insertions(+), 60 deletions(-) diff --git a/src/Storages/StorageS3Distributed.cpp b/src/Storages/StorageS3Distributed.cpp index 2bf47ab9f9f..12a1f146ad5 100644 --- a/src/Storages/StorageS3Distributed.cpp +++ b/src/Storages/StorageS3Distributed.cpp @@ -220,20 +220,6 @@ Pipe StorageS3Distributed::read( /// Secondary query, need to read from S3 if (context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { - /// Find initiator in cluster - Cluster::Address initiator; - [&]() - { - for (const auto & replicas : cluster->getShardsAddresses()) - for (const auto & node : replicas) - if (node.getHash() == address_hash_or_filename) - { - initiator = node; - return; - } - }(); - - bool need_path_column = false; bool need_file_column = false; for (const auto & column : column_names) @@ -268,53 +254,9 @@ Pipe StorageS3Distributed::read( /// The code from here and below executes on initiator - String hash_of_address; - [&]() - { - for (const auto & replicas : cluster->getShardsAddresses()) - for (const auto & node : replicas) - /// Finding ourselves in cluster - if (node.is_local && node.port == context.getTCPPort()) - { - hash_of_address = node.getHash(); - break; - } - }(); - - if (hash_of_address.empty()) - throw Exception(fmt::format("The initiator must be a part of a cluster {}", cluster_name), ErrorCodes::BAD_ARGUMENTS); - - /// Our purpose to change some arguments of this function to store some relevant - /// information. Then we will send changed query to another hosts. - /// We got a pointer to table function representation in AST (a pointer to subtree) - /// as parameter of TableFunctionRemote::execute and saved its hash value. - /// Here we find it in the AST of whole query, change parameter and format it to string. - auto remote_query_ast = query_info.query->clone(); - auto table_expressions_from_whole_query = getTableExpressions(remote_query_ast->as()); - - String remote_query; - for (const auto & table_expression : table_expressions_from_whole_query) - { - const auto & table_function_ast = table_expression->table_function; - if (table_function_ast->getTreeHash() == tree_hash) - { - auto & arguments = table_function_ast->children.at(0)->children; - auto & bucket = arguments[1]->as().value.safeGet(); - /// We rewrite query, and insert a port to connect as a first parameter - /// So, we write hash_of_address here as buckey name to find initiator node - /// in cluster from config on remote replica - bucket = hash_of_address; - remote_query = queryToString(remote_query_ast); - break; - } - } - - if (remote_query.empty()) - throw Exception(fmt::format("There is no table function with hash of AST equals to {}", hash_of_address), ErrorCodes::LOGICAL_ERROR); - /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block header = - InterpreterSelectQuery(remote_query_ast, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{}; @@ -339,7 +281,7 @@ Pipe StorageS3Distributed::read( )); auto stream = std::make_shared( /*connection=*/*connections.back(), - /*query=*/remote_query, + /*query=*/queryToString(query_info.query), /*header=*/header, /*context=*/context, /*throttler=*/nullptr,