CLICKHOUSE-3837 fix mistakes

This commit is contained in:
VadimPE 2018-07-25 15:31:47 +03:00
parent ca40a085b1
commit 867a3ebfae
10 changed files with 94 additions and 86 deletions

View File

@ -47,7 +47,8 @@ public:
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
std::vector<TryResult> getManyForTableFunc(const Settings * settings, PoolMode pool_mode);
/// The same as getMany(), but return std::vector<TryResult>.
std::vector<TryResult> getManyForTableFunction(const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;

View File

@ -81,18 +81,12 @@ void SelectStreamFactory::createForShard(
res.emplace_back(createLocalStream(query_ast, context, processed_stage));
};
auto emplace_remote_stream_for_database = [&]()
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
};
auto emplace_remote_stream_for_func = [&]()
auto emplace_remote_stream = [&]()
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
};
@ -119,10 +113,7 @@ void SelectStreamFactory::createForShard(
"There is no table " << main_table.database << "." << main_table.table
<< " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");
if (table_func_ptr)
emplace_remote_stream_for_func();
else
emplace_remote_stream_for_database();
emplace_remote_stream();
return;
}
else
@ -170,10 +161,7 @@ void SelectStreamFactory::createForShard(
if (shard_info.pool)
{
/// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas.
if (table_func_ptr)
emplace_remote_stream_for_func();
else
emplace_remote_stream_for_database();
emplace_remote_stream();
return;
}
else
@ -203,7 +191,7 @@ void SelectStreamFactory::createForShard(
try
{
if (table_func_ptr)
try_results = pool->getManyForTableFunc(&context.getSettingsRef(), PoolMode::GET_MANY);
try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
}
@ -241,10 +229,7 @@ void SelectStreamFactory::createForShard(
res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));
}
else
if (table_func_ptr)
emplace_remote_stream_for_func();
else
emplace_remote_stream_for_database();
emplace_remote_stream();
}
}

View File

@ -13,12 +13,14 @@ namespace ClusterProxy
class SelectStreamFactory final : public IStreamFactory
{
public:
/// Database in a query.
SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables);
/// TableFunction in a query.
SelectStreamFactory(
const Block & header_,
QueryProcessingStage::Enum processed_stage_,

View File

@ -53,12 +53,15 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context)
{
{
/// Branch with string in qery.
if (typeid_cast<const ASTLiteral *>(node.get()))
return node;
/// Branch with TableFunction in query.
if (auto table_func_ptr = typeid_cast<ASTFunction *>(node.get()))
if (TableFunctionFactory::instance().isTableFunctionName(table_func_ptr->name))
return node;
return std::make_shared<ASTLiteral>(evaluateConstantExpression(node, context).first);

View File

@ -356,7 +356,7 @@ void ASTSelectQuery::setDatabaseIfNeeded(const String & database_name)
}
void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const String & table_name, ASTPtr table_function_ptr)
void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const String & table_name)
{
ASTTableExpression * table_expression = getFirstTableExpression(*this);
@ -373,26 +373,42 @@ void ASTSelectQuery::replaceDatabaseAndTable(const String & database_name, const
table_expression = table_expr.get();
}
if (table_function_ptr) {
table_expression->table_function = table_function_ptr;
table_expression->database_and_table_name = nullptr;
ASTPtr table = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table);
if (!database_name.empty())
{
ASTPtr database = std::make_shared<ASTIdentifier>(database_name, ASTIdentifier::Database);
table_expression->database_and_table_name = std::make_shared<ASTIdentifier>(database_name + "." + table_name, ASTIdentifier::Table);
table_expression->database_and_table_name->children = {database, table};
}
else
{
ASTPtr table = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table);
if (!database_name.empty())
{
ASTPtr database = std::make_shared<ASTIdentifier>(database_name, ASTIdentifier::Database);
table_expression->database_and_table_name = std::make_shared<ASTIdentifier>(database_name + "." + table_name, ASTIdentifier::Table);
table_expression->database_and_table_name->children = {database, table};
}
else
{
table_expression->database_and_table_name = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table);
}
table_expression->database_and_table_name = std::make_shared<ASTIdentifier>(table_name, ASTIdentifier::Table);
}
}
void ASTSelectQuery::addTableFunction(ASTPtr & table_function_ptr)
{
ASTTableExpression * table_expression = getFirstTableExpression(*this);
if (!table_expression)
{
auto tables_list = std::make_shared<ASTTablesInSelectQuery>();
auto element = std::make_shared<ASTTablesInSelectQueryElement>();
auto table_expr = std::make_shared<ASTTableExpression>();
element->table_expression = table_expr;
element->children.emplace_back(table_expr);
tables_list->children.emplace_back(element);
tables = tables_list;
children.emplace_back(tables_list);
table_expression = table_expr.get();
}
table_expression->table_function = table_function_ptr;
table_expression->database_and_table_name = nullptr;
}
};

View File

@ -46,7 +46,8 @@ public:
bool array_join_is_left() const;
bool final() const;
void setDatabaseIfNeeded(const String & database_name);
void replaceDatabaseAndTable(const String & database_name, const String & table_name, ASTPtr table_function_name = nullptr);
void replaceDatabaseAndTable(const String & database_name, const String & table_name);
void addTableFunction(ASTPtr & table_function_ptr);
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -65,12 +65,15 @@ namespace ErrorCodes
namespace
{
/// select query has database and table names as AST pointers
/// Creates a copy of query, changes database and table names.
/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table, ASTPtr table_function_ptr = nullptr)
{
auto modified_query_ast = query->clone();
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table, table_function_ptr);
if (table_function_ptr)
typeid_cast<ASTSelectQuery &>(*modified_query_ast).addTableFunction(table_function_ptr);
else
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
return modified_query_ast;
}
@ -154,7 +157,7 @@ StorageDistributed::StorageDistributed(
bool attach)
: IStorage{columns_},
table_name(table_name_),
remote_database(remote_database_), remote_table(remote_table_), remote_table_function_ptr(nullptr),
remote_database(remote_database_), remote_table(remote_table_),
context(context_), cluster_name(context.getMacros()->expand(cluster_name_)), has_sharding_key(sharding_key_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, getColumns().getAllPhysical()).getActions(false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
@ -174,15 +177,13 @@ StorageDistributed::StorageDistributed(
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_,
const String & remote_table_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach)
: StorageDistributed(database_name, table_name_, columns_, remote_database_, remote_table_, cluster_name_, context_, sharding_key_, data_path_, attach)
: StorageDistributed(database_name, table_name_, columns_, String{}, String{}, cluster_name_, context_, sharding_key_, data_path_, attach)
{
remote_table_function_ptr = remote_table_function_ptr_;
}
@ -208,14 +209,12 @@ StoragePtr StorageDistributed::createWithOwnCluster(
StoragePtr StorageDistributed::createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ASTPtr remote_table_function_ptr_, /// Table function ptr.
ASTPtr & remote_table_function_ptr_,
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = ext::shared_ptr_helper<StorageDistributed>::create(
String{}, table_name_, columns_, remote_database_, remote_table_, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
String{}, table_name_, columns_, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
res->owned_cluster = owned_cluster_;
@ -249,23 +248,17 @@ BlockInputStreams StorageDistributed::read(
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, Names{}, processed_stage).getSampleBlock());
Block header = materializeBlock(InterpreterSelectQuery(query_info.query, context, {}, processed_stage).getSampleBlock());
if (remote_table_function_ptr)
{
ClusterProxy::SelectStreamFactory select_stream_factory(
header, processed_stage, remote_table_function_ptr, context.getExternalTables());
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
}
else
{
ClusterProxy::SelectStreamFactory select_stream_factory(
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ?
ClusterProxy::SelectStreamFactory(
header, processed_stage, remote_table_function_ptr, context.getExternalTables())
: ClusterProxy::SelectStreamFactory(
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
return ClusterProxy::executeQuery(
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
}
}

View File

@ -9,6 +9,7 @@
#include <Interpreters/Settings.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
@ -44,9 +45,7 @@ public:
static StoragePtr createWithOwnCluster(
const std::string & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ASTPtr remote_table_function_ptr_, /// Table function ptr.
ASTPtr & remote_table_function_ptr_, /// Table function ptr.
ClusterPtr & owned_cluster_,
const Context & context_);
@ -156,13 +155,11 @@ protected:
const ASTPtr & sharding_key_,
const String & data_path_,
bool attach);
StorageDistributed(
const String & database_name,
const String & table_name_,
const ColumnsDescription & columns_,
const String & remote_database_,
const String & remote_table_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
const Context & context_,

View File

@ -40,7 +40,7 @@ ColumnsDescription getStructureOfRemoteTable(
return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns();
}
auto table_func_name = table_func_ptr->getAliasOrColumnName();
auto table_func_name = queryToString(table_func_ptr);
query = "DESC TABLE " + table_func_name;
}
else
@ -63,7 +63,7 @@ ColumnsDescription getStructureOfRemoteTable(
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
ParserExpression expr_parser;
while (Block current = input->read())
{
ColumnPtr name = current.getByName("name").column;

View File

@ -196,9 +196,9 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
String cluster_name;
String cluster_description;
String remote_database = "";
String remote_table = "";
ASTPtr remote_table_function_ptr = nullptr;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
String username;
String password;
@ -232,7 +232,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
auto table_function = static_cast<ASTFunction *>(args[arg_num].get());
const auto table_function = static_cast<ASTFunction *>(args[arg_num].get());
if (TableFunctionFactory::instance().isTableFunctionName(table_function->name))
{
@ -314,14 +314,24 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, context.getTCPPort(), false);
}
auto res = StorageDistributed::createWithOwnCluster(
getName(),
getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr),
remote_database,
remote_table,
remote_table_function_ptr,
cluster,
context);
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr);
StoragePtr res = remote_table_function_ptr ?
res = StorageDistributed::createWithOwnCluster(
getName(),
structure_remote_table,
remote_table_function_ptr,
cluster,
context)
: res = StorageDistributed::createWithOwnCluster(
getName(),
structure_remote_table,
remote_database,
remote_table,
cluster,
context);
res->startup();
return res;
}