mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Add initial support for SELECT requests chained by UNION ALL.
This commit is contained in:
parent
6323cf5977
commit
8995500f14
@ -125,6 +125,8 @@ typedef std::list<Block> BlocksList;
|
||||
/// Сравнить типы столбцов у блоков. Порядок столбцов имеет значение. Имена не имеют значения.
|
||||
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
|
||||
|
||||
bool blocksHaveCompatibleStructure(const Block& lhs, const Block& rhs);
|
||||
|
||||
}
|
||||
|
||||
namespace std
|
||||
|
@ -265,6 +265,7 @@ namespace ErrorCodes
|
||||
TOO_MUCH_RETRIES_TO_FETCH_PARTS,
|
||||
PARTITION_ALREADY_EXISTS,
|
||||
PARTITION_DOESNT_EXIST,
|
||||
UNION_ALL_INCOMPATIBLE_RESULTS,
|
||||
|
||||
POCO_EXCEPTION = 1000,
|
||||
STD_EXCEPTION,
|
||||
|
10
dbms/include/DB/DataTypes/typesAreCompatible.h
Normal file
10
dbms/include/DB/DataTypes/typesAreCompatible.h
Normal file
@ -0,0 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/DataTypes/IDataType.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool typesAreCompatible(const IDataType& lhs, const IDataType& rhs);
|
||||
|
||||
}
|
@ -39,7 +39,8 @@ public:
|
||||
const Context & context_,
|
||||
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
|
||||
size_t subquery_depth_ = 0,
|
||||
BlockInputStreamPtr input = nullptr);
|
||||
BlockInputStreamPtr input = nullptr,
|
||||
bool is_union_all_head_ = true);
|
||||
|
||||
InterpreterSelectQuery(
|
||||
ASTPtr query_ptr_,
|
||||
@ -87,6 +88,8 @@ private:
|
||||
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
|
||||
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
|
||||
|
||||
BlockInputStreamPtr executeSingleQuery();
|
||||
|
||||
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
|
||||
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
|
||||
bool overflow_row, bool final);
|
||||
@ -111,6 +114,9 @@ private:
|
||||
size_t subquery_depth;
|
||||
ExpressionAnalyzerPtr query_analyzer;
|
||||
BlockInputStreams streams;
|
||||
|
||||
/// true, если это голова цепочки запросов SELECT объединённых ключевыми словами UNION ALL.
|
||||
bool is_union_all_head;
|
||||
|
||||
/// Таблица, откуда читать данные, если не подзапрос.
|
||||
StoragePtr storage;
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include <DB/Columns/ColumnArray.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
#include <DB/DataTypes/typesAreCompatible.h>
|
||||
|
||||
#include <DB/Parsers/ASTExpressionList.h>
|
||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||
@ -363,7 +364,7 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
|
||||
{
|
||||
const IDataType & lhs_type = *lhs.getByPosition(i).type;
|
||||
const IDataType & rhs_type = *rhs.getByPosition(i).type;
|
||||
|
||||
|
||||
if (lhs_type.getName() != rhs_type.getName())
|
||||
return false;
|
||||
}
|
||||
@ -371,6 +372,23 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
|
||||
return true;
|
||||
}
|
||||
|
||||
bool blocksHaveCompatibleStructure(const Block & lhs, const Block & rhs)
|
||||
{
|
||||
size_t columns = lhs.columns();
|
||||
if (rhs.columns() != columns)
|
||||
return false;
|
||||
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
{
|
||||
const auto & lhs_type = *lhs.getByPosition(i).type;
|
||||
const auto & rhs_type = *rhs.getByPosition(i).type;
|
||||
|
||||
if (!typesAreCompatible(lhs_type, rhs_type))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Block::clear()
|
||||
{
|
||||
|
65
dbms/src/DataTypes/typesAreCompatible.cpp
Normal file
65
dbms/src/DataTypes/typesAreCompatible.cpp
Normal file
@ -0,0 +1,65 @@
|
||||
#include <DB/DataTypes/typesAreCompatible.h>
|
||||
#include <DB/DataTypes/DataTypeFixedString.h>
|
||||
#include <DB/DataTypes/DataTypeString.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
#include <algorithm>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
#define REGISTER_COMPATIBLE_TYPES(T, U) \
|
||||
{ T.getName(), U.getName() }, \
|
||||
{ U.getName(), T.getName() } \
|
||||
|
||||
std::vector<std::pair<std::string, std::string> > init()
|
||||
{
|
||||
std::vector<std::pair<std::string, std::string> > types_desc =
|
||||
{
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeString(), DB::DataTypeFixedString(1)),
|
||||
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeFloat32(), DB::DataTypeFloat64()),
|
||||
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt8()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt16()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt16()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt32()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt32()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt64()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt64()),
|
||||
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt16()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeInt32()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt32()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeInt64()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt64()),
|
||||
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeUInt32()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeInt64()),
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeUInt64()),
|
||||
|
||||
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt64(), DB::DataTypeUInt64())
|
||||
};
|
||||
std::sort(types_desc.begin(), types_desc.end());
|
||||
return types_desc;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool typesAreCompatible(const IDataType & lhs, const IDataType & rhs)
|
||||
{
|
||||
static const auto types_desc = init();
|
||||
|
||||
if (lhs.getName() == rhs.getName())
|
||||
return true;
|
||||
|
||||
return std::binary_search(types_desc.begin(), types_desc.end(), std::make_pair(lhs.getName(), rhs.getName()));
|
||||
}
|
||||
|
||||
}
|
@ -85,12 +85,26 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType
|
||||
|
||||
if (input_)
|
||||
streams.push_back(input_);
|
||||
|
||||
if (is_union_all_head && (!query.next_union_all.isNull()))
|
||||
{
|
||||
// Проверить, что результаты всех запросов SELECT cовместимые.
|
||||
Block previous = getSampleBlock();
|
||||
for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery *>(&*tree))->next_union_all)
|
||||
{
|
||||
Block current = InterpreterSelectQuery(tree, context, to_stage, subquery_depth, nullptr, false).getSampleBlock();
|
||||
if (!blocksHaveCompatibleStructure(previous, current))
|
||||
throw Exception("Incompatible results in the SELECT queries of the UNION ALL chain", ErrorCodes::UNION_ALL_INCOMPATIBLE_RESULTS);
|
||||
previous = std::move(current);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_,
|
||||
size_t subquery_depth_, BlockInputStreamPtr input_)
|
||||
size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_)
|
||||
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
|
||||
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
|
||||
is_union_all_head(is_union_all_head_),
|
||||
log(&Logger::get("InterpreterSelectQuery"))
|
||||
{
|
||||
init(input_);
|
||||
@ -101,6 +115,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
|
||||
QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
|
||||
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
|
||||
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
|
||||
is_union_all_head(true),
|
||||
log(&Logger::get("InterpreterSelectQuery"))
|
||||
{
|
||||
/** Оставляем в запросе в секции SELECT только нужные столбцы.
|
||||
@ -117,6 +132,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
|
||||
const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
|
||||
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
|
||||
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
|
||||
is_union_all_head(true),
|
||||
log(&Logger::get("InterpreterSelectQuery"))
|
||||
{
|
||||
/** Оставляем в запросе в секции SELECT только нужные столбцы.
|
||||
@ -187,8 +203,29 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
|
||||
: in;
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
{
|
||||
BlockInputStreamPtr first_query_plan = executeSingleQuery();
|
||||
|
||||
if (is_union_all_head && !query.next_union_all.isNull())
|
||||
{
|
||||
BlockInputStreams streams;
|
||||
streams.push_back(first_query_plan);
|
||||
|
||||
for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery *>(&*tree))->next_union_all)
|
||||
{
|
||||
Context select_query_context = context;
|
||||
InterpreterSelectQuery interpreter(tree, select_query_context, to_stage, subquery_depth, nullptr, false);
|
||||
streams.push_back(maybeAsynchronous(interpreter.executeSingleQuery(), settings.asynchronous));
|
||||
}
|
||||
|
||||
return new UnionBlockInputStream(streams, settings.max_threads);
|
||||
}
|
||||
else
|
||||
return first_query_plan;
|
||||
}
|
||||
|
||||
BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
|
||||
{
|
||||
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
|
||||
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем
|
||||
|
@ -238,6 +238,13 @@ void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent, bo
|
||||
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "FORMAT " << (hilite ? hilite_none : "");
|
||||
formatAST(*ast.format, s, indent, hilite, one_line);
|
||||
}
|
||||
|
||||
if (!ast.next_union_all.isNull())
|
||||
{
|
||||
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "UNION ALL " << (hilite ? hilite_none : "");
|
||||
const ASTSelectQuery * next_ast = static_cast<const ASTSelectQuery *>(&*ast.next_union_all);
|
||||
formatAST(*next_ast, s, false, hilite, one_line, need_parens);
|
||||
}
|
||||
}
|
||||
|
||||
void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)
|
||||
|
Loading…
Reference in New Issue
Block a user