Add initial support for SELECT requests chained by UNION ALL.

This commit is contained in:
Alexey Arno 2014-12-12 16:26:17 +03:00
parent 7616eb8e5e
commit b630f30a25
8 changed files with 150 additions and 4 deletions

View File

@ -125,6 +125,8 @@ typedef std::list<Block> BlocksList;
/// Сравнить типы столбцов у блоков. Порядок столбцов имеет значение. Имена не имеют значения.
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
bool blocksHaveCompatibleStructure(const Block& lhs, const Block& rhs);
}
namespace std

View File

@ -265,6 +265,7 @@ namespace ErrorCodes
TOO_MUCH_RETRIES_TO_FETCH_PARTS,
PARTITION_ALREADY_EXISTS,
PARTITION_DOESNT_EXIST,
UNION_ALL_INCOMPATIBLE_RESULTS,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -0,0 +1,10 @@
#pragma once
#include <DB/DataTypes/IDataType.h>
namespace DB
{
bool typesAreCompatible(const IDataType& lhs, const IDataType& rhs);
}

View File

@ -39,7 +39,8 @@ public:
const Context & context_,
QueryProcessingStage::Enum to_stage_ = QueryProcessingStage::Complete,
size_t subquery_depth_ = 0,
BlockInputStreamPtr input = nullptr);
BlockInputStreamPtr input = nullptr,
bool is_union_all_head_ = true);
InterpreterSelectQuery(
ASTPtr query_ptr_,
@ -87,6 +88,8 @@ private:
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
BlockInputStreamPtr executeSingleQuery();
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
bool overflow_row, bool final);
@ -111,6 +114,9 @@ private:
size_t subquery_depth;
ExpressionAnalyzerPtr query_analyzer;
BlockInputStreams streams;
/// true, если это голова цепочки запросов SELECT объединённых ключевыми словами UNION ALL.
bool is_union_all_head;
/// Таблица, откуда читать данные, если не подзапрос.
StoragePtr storage;

View File

@ -9,6 +9,7 @@
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/typesAreCompatible.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
@ -363,7 +364,7 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
const IDataType & lhs_type = *lhs.getByPosition(i).type;
const IDataType & rhs_type = *rhs.getByPosition(i).type;
if (lhs_type.getName() != rhs_type.getName())
return false;
}
@ -371,6 +372,23 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
return true;
}
bool blocksHaveCompatibleStructure(const Block & lhs, const Block & rhs)
{
size_t columns = lhs.columns();
if (rhs.columns() != columns)
return false;
for (size_t i = 0; i < columns; ++i)
{
const auto & lhs_type = *lhs.getByPosition(i).type;
const auto & rhs_type = *rhs.getByPosition(i).type;
if (!typesAreCompatible(lhs_type, rhs_type))
return false;
}
return true;
}
void Block::clear()
{

View File

@ -0,0 +1,65 @@
#include <DB/DataTypes/typesAreCompatible.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <string>
#include <vector>
#include <utility>
#include <algorithm>
namespace
{
#define REGISTER_COMPATIBLE_TYPES(T, U) \
{ T.getName(), U.getName() }, \
{ U.getName(), T.getName() } \
std::vector<std::pair<std::string, std::string> > init()
{
std::vector<std::pair<std::string, std::string> > types_desc =
{
REGISTER_COMPATIBLE_TYPES(DB::DataTypeString(), DB::DataTypeFixedString(1)),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeFloat32(), DB::DataTypeFloat64()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt8()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt16()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt16()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt32()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt32()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeInt64()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt8(), DB::DataTypeUInt64()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt16()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeInt32()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt32()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeInt64()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt16(), DB::DataTypeUInt64()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeUInt32()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeInt64()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt32(), DB::DataTypeUInt64()),
REGISTER_COMPATIBLE_TYPES(DB::DataTypeInt64(), DB::DataTypeUInt64())
};
std::sort(types_desc.begin(), types_desc.end());
return types_desc;
}
}
namespace DB
{
bool typesAreCompatible(const IDataType & lhs, const IDataType & rhs)
{
static const auto types_desc = init();
if (lhs.getName() == rhs.getName())
return true;
return std::binary_search(types_desc.begin(), types_desc.end(), std::make_pair(lhs.getName(), rhs.getName()));
}
}

View File

@ -85,12 +85,26 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType
if (input_)
streams.push_back(input_);
if (is_union_all_head && (!query.next_union_all.isNull()))
{
// Проверить, что результаты всех запросов SELECT cовместимые.
Block previous = getSampleBlock();
for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery *>(&*tree))->next_union_all)
{
Block current = InterpreterSelectQuery(tree, context, to_stage, subquery_depth, nullptr, false).getSampleBlock();
if (!blocksHaveCompatibleStructure(previous, current))
throw Exception("Incompatible results in the SELECT queries of the UNION ALL chain", ErrorCodes::UNION_ALL_INCOMPATIBLE_RESULTS);
previous = std::move(current);
}
}
}
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_, BlockInputStreamPtr input_)
size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_union_all_head(is_union_all_head_),
log(&Logger::get("InterpreterSelectQuery"))
{
init(input_);
@ -101,6 +115,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_union_all_head(true),
log(&Logger::get("InterpreterSelectQuery"))
{
/** Оставляем в запросе в секции SELECT только нужные столбцы.
@ -117,6 +132,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
const NamesAndTypesList & table_column_names, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_union_all_head(true),
log(&Logger::get("InterpreterSelectQuery"))
{
/** Оставляем в запросе в секции SELECT только нужные столбцы.
@ -187,8 +203,29 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
: in;
}
BlockInputStreamPtr InterpreterSelectQuery::execute()
{
BlockInputStreamPtr first_query_plan = executeSingleQuery();
if (is_union_all_head && !query.next_union_all.isNull())
{
BlockInputStreams streams;
streams.push_back(first_query_plan);
for (ASTPtr tree = query.next_union_all; !tree.isNull(); tree = (static_cast<ASTSelectQuery *>(&*tree))->next_union_all)
{
Context select_query_context = context;
InterpreterSelectQuery interpreter(tree, select_query_context, to_stage, subquery_depth, nullptr, false);
streams.push_back(maybeAsynchronous(interpreter.executeSingleQuery(), settings.asynchronous));
}
return new UnionBlockInputStream(streams, settings.max_threads);
}
else
return first_query_plan;
}
BlockInputStreamPtr InterpreterSelectQuery::executeSingleQuery()
{
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем

View File

@ -238,6 +238,13 @@ void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent, bo
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "FORMAT " << (hilite ? hilite_none : "");
formatAST(*ast.format, s, indent, hilite, one_line);
}
if (!ast.next_union_all.isNull())
{
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "UNION ALL " << (hilite ? hilite_none : "");
const ASTSelectQuery * next_ast = static_cast<const ASTSelectQuery *>(&*ast.next_union_all);
formatAST(*next_ast, s, false, hilite, one_line, need_parens);
}
}
void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)