Make better

This commit is contained in:
avogar 2022-10-18 13:03:55 +00:00
parent 75c4ef0be7
commit 2c2f977096
7 changed files with 69 additions and 74 deletions

View File

@ -156,20 +156,4 @@ TablesWithColumns getDatabaseAndTablesWithColumns(
return tables_with_columns;
}
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
{
auto * select_query = query->as<ASTSelectQuery>();
if (!select_query || !select_query->tables())
return nullptr;
auto * tables = select_query->tables()->as<ASTTablesInSelectQuery>();
auto * table_expression = tables->children[0]->as<ASTTablesInSelectQueryElement>()->table_expression->as<ASTTableExpression>();
if (!table_expression->table_function)
return nullptr;
auto * table_function = table_expression->table_function->as<ASTFunction>();
return table_function->arguments->as<ASTExpressionList>();
}
}

View File

@ -24,6 +24,4 @@ ASTPtr extractTableExpression(const ASTSelectQuery & select, size_t table_number
TablesWithColumns getDatabaseAndTablesWithColumns(
const ASTTableExprConstPtrs & table_expressions, ContextPtr context, bool include_alias_cols, bool include_materialized_cols);
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);
}

View File

@ -11,7 +11,6 @@
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
@ -26,6 +25,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/StorageDictionary.h>
#include <Storages/addColumnsStructureToQueryWithClusterEngine.h>
#include <memory>
@ -38,29 +38,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static ASTPtr addColumnsStructureToQuery(const ASTPtr & query, const String & structure)
{
/// Add argument with table structure to hdfsCluster table function in select query.
auto result_query = query->clone();
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(result_query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function hdfsCluster, got '{}'", queryToString(query));
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (expression_list->children.size() != 2 && expression_list->children.size() != 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 or 3 arguments in hdfsCluster table functions, got {}", expression_list->children.size());
if (expression_list->children.size() == 2)
{
auto format_literal = std::make_shared<ASTLiteral>("auto");
expression_list->children.push_back(format_literal);
}
expression_list->children.push_back(structure_literal);
return result_query;
}
StorageHDFSCluster::StorageHDFSCluster(
ContextPtr context_,
String cluster_name_,
@ -85,7 +62,7 @@ StorageHDFSCluster::StorageHDFSCluster(
{
auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
need_to_add_structure_to_query = true;
add_columns_structure_to_query = true;
}
else
storage_metadata.setColumns(columns_);
@ -122,10 +99,10 @@ Pipe StorageHDFSCluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
auto query_to_send = query_info.original_query;
if (need_to_add_structure_to_query)
query_to_send = addColumnsStructureToQuery(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()));
auto query_to_send = query_info.original_query->clone();
if (add_columns_structure_to_query)
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());
for (const auto & replicas : cluster->getShardsAddresses())
{

View File

@ -44,7 +44,7 @@ private:
String uri;
String format_name;
String compression_method;
bool need_to_add_structure_to_query = false;
bool add_columns_structure_to_query = false;
};

View File

@ -14,7 +14,6 @@
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
@ -27,6 +26,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/StorageDictionary.h>
#include <Storages/addColumnsStructureToQueryWithClusterEngine.h>
#include <Common/logger_useful.h>
#include <aws/core/auth/AWSCredentials.h>
@ -44,28 +44,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static ASTPtr addColumnsStructureToQuery(const ASTPtr & query, const String & structure)
{
/// Add argument with table structure to s3Cluster table function in select query.
auto result_query = query->clone();
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(result_query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function from s3Cluster, got '{}'", queryToString(query));
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (expression_list->children.size() < 2 || expression_list->children.size() > 5)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 to 5 arguments in s3Cluster table functions, got {}", expression_list->children.size());
if (expression_list->children.size() == 2 || expression_list->children.size() == 4)
{
auto format_literal = std::make_shared<ASTLiteral>("auto");
expression_list->children.push_back(format_literal);
}
expression_list->children.push_back(structure_literal);
return result_query;
}
StorageS3Cluster::StorageS3Cluster(
const StorageS3ClusterConfiguration & configuration_,
const StorageID & table_id_,
@ -139,10 +117,10 @@ Pipe StorageS3Cluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
ASTPtr query_to_send = query_info.original_query;
ASTPtr query_to_send = query_info.original_query->clone();
if (need_to_add_structure_to_query)
query_to_send = addColumnsStructureToQuery(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()));
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName());
for (const auto & replicas : cluster->getShardsAddresses())
{

View File

@ -0,0 +1,47 @@
#include <Storages/addColumnsStructureToQueryWithClusterEngine.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
namespace DB
{
static ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
{
auto * select_query = query->as<ASTSelectQuery>();
if (!select_query || !select_query->tables())
return nullptr;
auto * tables = select_query->tables()->as<ASTTablesInSelectQuery>();
auto * table_expression = tables->children[0]->as<ASTTablesInSelectQueryElement>()->table_expression->as<ASTTableExpression>();
if (!table_expression->table_function)
return nullptr;
auto * table_function = table_expression->table_function->as<ASTFunction>();
return table_function->arguments->as<ASTExpressionList>();
}
void addColumnsStructureToQueryWithClusterEngine(ASTPtr & query, const String & structure, size_t max_arguments, const String & function_name)
{
/// Add argument with table structure to s3Cluster table function in select query.
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function {}, got '{}'", function_name, queryToString(query));
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (expression_list->children.size() < 2 || expression_list->children.size() > max_arguments)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 to {} arguments in {} table functions, got {}", function_name, max_arguments, expression_list->children.size());
if (expression_list->children.size() == 2 || expression_list->children.size() == max_arguments - 1)
{
auto format_literal = std::make_shared<ASTLiteral>("auto");
expression_list->children.push_back(format_literal);
}
expression_list->children.push_back(structure_literal);
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <Parsers/IAST.h>
namespace DB
{
/// Add structure argument for queries with s3Cluster/hdfsCluster table function.
void addColumnsStructureToQueryWithClusterEngine(ASTPtr & query, const String & structure, size_t max_arguments, const String & function_name);
}