simplify storages3dist

This commit is contained in:
Nikita Mikhaylov 2021-04-06 14:51:16 +03:00
parent 4843f86329
commit fea3aaafe6

View File

@ -220,20 +220,6 @@ Pipe StorageS3Distributed::read(
/// Secondary query, need to read from S3
if (context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// Find initiator in cluster
Cluster::Address initiator;
[&]()
{
for (const auto & replicas : cluster->getShardsAddresses())
for (const auto & node : replicas)
if (node.getHash() == address_hash_or_filename)
{
initiator = node;
return;
}
}();
bool need_path_column = false;
bool need_file_column = false;
for (const auto & column : column_names)
@ -268,53 +254,9 @@ Pipe StorageS3Distributed::read(
/// The code from here and below executes on initiator
String hash_of_address;
[&]()
{
for (const auto & replicas : cluster->getShardsAddresses())
for (const auto & node : replicas)
/// Finding ourselves in cluster
if (node.is_local && node.port == context.getTCPPort())
{
hash_of_address = node.getHash();
break;
}
}();
if (hash_of_address.empty())
throw Exception(fmt::format("The initiator must be a part of a cluster {}", cluster_name), ErrorCodes::BAD_ARGUMENTS);
/// Our purpose to change some arguments of this function to store some relevant
/// information. Then we will send changed query to another hosts.
/// We got a pointer to table function representation in AST (a pointer to subtree)
/// as parameter of TableFunctionRemote::execute and saved its hash value.
/// Here we find it in the AST of whole query, change parameter and format it to string.
auto remote_query_ast = query_info.query->clone();
auto table_expressions_from_whole_query = getTableExpressions(remote_query_ast->as<ASTSelectQuery &>());
String remote_query;
for (const auto & table_expression : table_expressions_from_whole_query)
{
const auto & table_function_ast = table_expression->table_function;
if (table_function_ast->getTreeHash() == tree_hash)
{
auto & arguments = table_function_ast->children.at(0)->children;
auto & bucket = arguments[1]->as<ASTLiteral &>().value.safeGet<String>();
/// We rewrite query, and insert a port to connect as a first parameter
/// So, we write hash_of_address here as buckey name to find initiator node
/// in cluster from config on remote replica
bucket = hash_of_address;
remote_query = queryToString(remote_query_ast);
break;
}
}
if (remote_query.empty())
throw Exception(fmt::format("There is no table function with hash of AST equals to {}", hash_of_address), ErrorCodes::LOGICAL_ERROR);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
InterpreterSelectQuery(remote_query_ast, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
@ -339,7 +281,7 @@ Pipe StorageS3Distributed::read(
));
auto stream = std::make_shared<RemoteBlockInputStream>(
/*connection=*/*connections.back(),
/*query=*/remote_query,
/*query=*/queryToString(query_info.query),
/*header=*/header,
/*context=*/context,
/*throttler=*/nullptr,