mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
dbms: fixed error with GLOBAL IN when one of shard is localhost [#METR-14557].
This commit is contained in:
parent
11eaf0300a
commit
715f5bf8d1
51
dbms/include/DB/DataStreams/LazyBlockInputStream.h
Normal file
51
dbms/include/DB/DataStreams/LazyBlockInputStream.h
Normal file
@ -0,0 +1,51 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Инициализировать другой источник при первом вызове read, и затем использовать его.
|
||||
* Это нужно, например, для чтения из таблицы, которая будет заполнена
|
||||
* после создания объекта LazyBlockInputStream, но до первого вызова read.
|
||||
*/
|
||||
class LazyBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
using Generator = std::function<BlockInputStreamPtr()>;
|
||||
|
||||
LazyBlockInputStream(Generator generator_)
|
||||
: generator(generator_) {}
|
||||
|
||||
String getName() const override { return "LazyBlockInputStream"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Lazy(" << this << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
if (!input)
|
||||
{
|
||||
input = generator();
|
||||
|
||||
if (!input)
|
||||
return Block();
|
||||
|
||||
children.push_back(input);
|
||||
}
|
||||
|
||||
return input->read();
|
||||
}
|
||||
|
||||
private:
|
||||
Generator generator;
|
||||
BlockInputStreamPtr input;
|
||||
};
|
||||
|
||||
}
|
@ -75,8 +75,6 @@ public:
|
||||
Block getSampleBlock();
|
||||
|
||||
private:
|
||||
typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr;
|
||||
|
||||
void init(BlockInputStreamPtr input, const Names & required_column_names = Names(), const NamesAndTypesList & table_column_names = NamesAndTypesList());
|
||||
void basicInit(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names);
|
||||
void initQueryAnalyzer();
|
||||
@ -135,7 +133,7 @@ private:
|
||||
size_t original_max_threads; /// В settings настройка max_threads может быть изменена. В original_max_threads сохраняется изначальное значение.
|
||||
QueryProcessingStage::Enum to_stage;
|
||||
size_t subquery_depth;
|
||||
ExpressionAnalyzerPtr query_analyzer;
|
||||
std::unique_ptr<ExpressionAnalyzer> query_analyzer;
|
||||
BlockInputStreams streams;
|
||||
|
||||
/** Цепочка UNION ALL может иметь длину 1 (в таком случае имеется просто один запрос SELECT)
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <DB/DataTypes/DataTypeTuple.h>
|
||||
#include <DB/DataTypes/DataTypeExpression.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
|
||||
#include <DB/Columns/ColumnSet.h>
|
||||
#include <DB/Columns/ColumnExpression.h>
|
||||
|
||||
@ -26,6 +27,7 @@
|
||||
#include <DB/Storages/StorageMemory.h>
|
||||
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
||||
|
||||
#include <DB/DataStreams/LazyBlockInputStream.h>
|
||||
#include <DB/DataStreams/copyData.h>
|
||||
|
||||
#include <DB/Common/typeid_cast.h>
|
||||
@ -586,7 +588,7 @@ static SharedPtr<InterpreterSelectQuery> interpretSubquery(
|
||||
/// create ASTSelectQuery for "SELECT * FROM table" as if written by hand
|
||||
const auto select_query = new ASTSelectQuery;
|
||||
query = select_query;
|
||||
|
||||
|
||||
const auto select_expression_list = new ASTExpressionList;
|
||||
select_query->select_expression_list = select_expression_list;
|
||||
select_query->children.emplace_back(select_query->select_expression_list);
|
||||
@ -637,7 +639,6 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
|
||||
NamesAndTypesListPtr columns = new NamesAndTypesList(sample.getColumnsList());
|
||||
|
||||
String external_table_name = "_data" + toString(external_table_id);
|
||||
++external_table_id;
|
||||
|
||||
/** Заменяем подзапрос на имя временной таблицы.
|
||||
* Именно в таком виде, запрос отправится на удалённый сервер.
|
||||
@ -699,7 +700,37 @@ void ExpressionAnalyzer::makeSet(ASTFunction * node, const Block & sample_block)
|
||||
* - в этой функции видно выражение IN _data1.
|
||||
*/
|
||||
if (!subquery_for_set.source)
|
||||
subquery_for_set.source = interpretSubquery(arg, context, subquery_depth)->execute();
|
||||
{
|
||||
auto interpreter = interpretSubquery(arg, context, subquery_depth);
|
||||
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); });
|
||||
|
||||
/** Зачем используется LazyBlockInputStream?
|
||||
*
|
||||
* Дело в том, что при обработке запроса вида
|
||||
* SELECT ... FROM remote_test WHERE column GLOBAL IN (subquery),
|
||||
* если распределённая таблица remote_test содержит в качестве одного из серверов localhost,
|
||||
* то запрос будет ещё раз интерпретирован локально (а не отправлен по TCP, как в случае удалённого сервера).
|
||||
*
|
||||
* Конвейер выполнения запроса будет такой:
|
||||
* CreatingSets
|
||||
* выполнение подзапроса subquery, заполнение временной таблицы _data1 (1)
|
||||
* CreatingSets
|
||||
* чтение из таблицы _data1, создание множества (2)
|
||||
* чтение из таблицы, подчинённой remote_test.
|
||||
*
|
||||
* (Вторая часть конвейера под CreatingSets - это повторная интерпретация запроса внутри StorageDistributed,
|
||||
* запрос отличается тем, что имя БД и таблицы заменены на подчинённые, а также подзапрос заменён на _data1.)
|
||||
*
|
||||
* Но при создании конвейера, при создании источника (2), будет обнаружено, что таблица _data1 пустая
|
||||
* (потому что запрос ещё не начал выполняться), и будет возвращён в качестве источника пустой источник.
|
||||
* И затем, при выполнении запроса, на шаге (2), будет создано пустое множество.
|
||||
*
|
||||
* Поэтому, мы делаем инициализацию шага (2) ленивой
|
||||
* - чтобы она произошла только после выполнения шага (1), на котором нужная таблица будет заполнена.
|
||||
*
|
||||
* Замечание: это решение не очень хорошее, надо подумать лучше.
|
||||
*/
|
||||
}
|
||||
|
||||
subquery_for_set.set = ast_set->set;
|
||||
arg = ast_set_ptr;
|
||||
@ -1397,7 +1428,10 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
|
||||
* - в этой функции видно выражение JOIN _data1.
|
||||
*/
|
||||
if (!subquery_for_set.source)
|
||||
subquery_for_set.source = interpretSubquery(ast_join.table, context, subquery_depth, required_joined_columns)->execute();
|
||||
{
|
||||
auto interpreter = interpretSubquery(ast_join.table, context, subquery_depth, required_joined_columns);
|
||||
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); });
|
||||
}
|
||||
|
||||
subquery_for_set.join = join;
|
||||
}
|
||||
@ -1762,7 +1796,7 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
|
||||
else if (typeid_cast<const ASTSubquery *>(node.table.get()))
|
||||
{
|
||||
const auto & table = node.table->children.at(0);
|
||||
nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock();
|
||||
nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock();
|
||||
}
|
||||
|
||||
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
|
||||
|
@ -109,7 +109,7 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
|
||||
if (context.getColumns().empty())
|
||||
throw Exception("There are no available columns", ErrorCodes::THERE_IS_NO_COLUMN);
|
||||
|
||||
query_analyzer = new ExpressionAnalyzer(query_ptr, context, storage, subquery_depth, true);
|
||||
query_analyzer.reset(new ExpressionAnalyzer(query_ptr, context, storage, subquery_depth, true));
|
||||
|
||||
/// Сохраняем в query context новые временные таблицы
|
||||
for (auto & it : query_analyzer->getExternalTables())
|
||||
@ -141,9 +141,9 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
|
||||
|
||||
void InterpreterSelectQuery::initQueryAnalyzer()
|
||||
{
|
||||
query_analyzer = new ExpressionAnalyzer(query_ptr, context, storage, subquery_depth, true);
|
||||
query_analyzer.reset(new ExpressionAnalyzer(query_ptr, context, storage, subquery_depth, true));
|
||||
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
|
||||
p->query_analyzer = new ExpressionAnalyzer(p->query_ptr, p->context, p->storage, p->subquery_depth, true);
|
||||
p->query_analyzer.reset(new ExpressionAnalyzer(p->query_ptr, p->context, p->storage, p->subquery_depth, true));
|
||||
}
|
||||
|
||||
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_,
|
||||
|
@ -164,8 +164,7 @@ BlockInputStreams StorageDistributed::read(
|
||||
for (auto & conn_pool : cluster.pools)
|
||||
res.emplace_back(new RemoteBlockInputStream{
|
||||
conn_pool, modified_query, &new_settings,
|
||||
external_tables, processed_stage, context
|
||||
});
|
||||
external_tables, processed_stage, context});
|
||||
|
||||
/// Добавляем запросы к локальному ClickHouse.
|
||||
if (cluster.getLocalNodesNum() > 0)
|
||||
|
Loading…
Reference in New Issue
Block a user