mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
dbms: Sending only required external tables to remote servers [METR-10071]
This commit is contained in:
parent
e426667931
commit
fff1593bd9
@ -100,7 +100,7 @@ public:
|
||||
Block getSelectSampleBlock();
|
||||
|
||||
/// Все новые временные таблицы, полученные при выполнении подзапросов GLOBAL IN.
|
||||
std::vector<StoragePtr> external_tables;
|
||||
Tables external_tables;
|
||||
private:
|
||||
typedef std::set<String> NamesSet;
|
||||
|
||||
@ -263,7 +263,8 @@ private:
|
||||
|
||||
/// Обходит запрос и сохраняет найденные глобальные функции (например GLOBAL IN)
|
||||
void findGlobalFunctions(ASTPtr & ast, std::vector<ASTPtr> & global_nodes);
|
||||
|
||||
void findExternalTables(ASTPtr & ast);
|
||||
|
||||
/// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn.
|
||||
void makeSet(ASTFunction * node, const Block & sample_block);
|
||||
/// Выполнить подзапрос в секции GLOBAL IN и запомнить результат во временную таблицу типа memory
|
||||
|
@ -516,6 +516,34 @@ void ExpressionAnalyzer::findGlobalFunctions(ASTPtr & ast, std::vector<ASTPtr> &
|
||||
}
|
||||
|
||||
|
||||
void ExpressionAnalyzer::findExternalTables(ASTPtr & ast)
|
||||
{
|
||||
/// Рекурсивные вызовы. Намеренно опускаемся в подзапросы.
|
||||
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
|
||||
findExternalTables(*it);
|
||||
|
||||
/// Если идентификатор типа таблица
|
||||
StoragePtr external_storage;
|
||||
if (ASTIdentifier * node = dynamic_cast<ASTIdentifier *>(&*ast))
|
||||
if (node->kind == ASTIdentifier::Kind::Table)
|
||||
if (external_storage = context.tryGetExternalTable(node->name))
|
||||
external_tables[node->name] = external_storage;
|
||||
|
||||
if (ASTFunction * node = dynamic_cast<ASTFunction *>(&*ast))
|
||||
{
|
||||
if (node->name == "globalIn" || node->name == "globalNotIn" || node->name == "In" || node->name == "NotIn")
|
||||
{
|
||||
IAST & args = *node->arguments;
|
||||
ASTPtr & arg = args.children[1];
|
||||
/// Если имя таблицы для селекта
|
||||
if (ASTIdentifier * id = dynamic_cast<ASTIdentifier *>(&*arg))
|
||||
if (external_storage = context.tryGetExternalTable(id->name))
|
||||
external_tables[id->name] = external_storage;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ExpressionAnalyzer::addExternalStorage(ASTFunction * node, size_t & name_id)
|
||||
{
|
||||
IAST & args = *node->arguments;
|
||||
@ -542,8 +570,13 @@ void ExpressionAnalyzer::addExternalStorage(ASTFunction * node, size_t & name_id
|
||||
{
|
||||
ParserSelectQuery parser;
|
||||
|
||||
if (context.tryGetExternalTable(table->name))
|
||||
StoragePtr existing_storage;
|
||||
|
||||
if (existing_storage = context.tryGetExternalTable(table->name))
|
||||
{
|
||||
external_tables[table->name] = existing_storage;
|
||||
return;
|
||||
}
|
||||
|
||||
String query = "SELECT * FROM " + table->name;
|
||||
const char * begin = query.data();
|
||||
@ -573,7 +606,7 @@ void ExpressionAnalyzer::addExternalStorage(ASTFunction * node, size_t & name_id
|
||||
ast_ident->kind = ASTIdentifier::Table;
|
||||
ast_ident->name = external_storage->getTableName();
|
||||
arg = ast_ident;
|
||||
external_tables.push_back(external_storage);
|
||||
external_tables[external_table_name] = external_storage;
|
||||
}
|
||||
else
|
||||
throw Exception("GLOBAL [NOT] IN supports only SELECT data.", ErrorCodes::BAD_ARGUMENTS);
|
||||
@ -1177,6 +1210,8 @@ void ExpressionAnalyzer::processGlobalOperations()
|
||||
++id;
|
||||
addExternalStorage(dynamic_cast<ASTFunction *>(&*global_nodes[i]), id);
|
||||
}
|
||||
|
||||
findExternalTables(ast);
|
||||
}
|
||||
|
||||
bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool only_types)
|
||||
|
@ -83,7 +83,8 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType
|
||||
|
||||
/// Сохраняем в query context новые временные таблицы
|
||||
for (auto & it : query_analyzer->external_tables)
|
||||
context.addExternalTable(it->getTableName(), it);
|
||||
if (!(context.tryGetExternalTable(it.first)))
|
||||
context.addExternalTable(it.first, it.second);
|
||||
|
||||
if (input_)
|
||||
streams.push_back(input_);
|
||||
@ -504,7 +505,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
|
||||
if (!interpreter_subquery)
|
||||
{
|
||||
if (storage->isRemote())
|
||||
storage->storeExternalTables(context.getExternalTables());
|
||||
storage->storeExternalTables(query_analyzer->external_tables);
|
||||
streams = storage->read(required_columns, query_ptr, settings_for_storage, from_stage, settings.max_block_size, settings.max_threads);
|
||||
for (auto stream : streams)
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user