This commit is contained in:
Alexey Milovidov 2014-12-24 22:00:41 +03:00
commit 33efb971b4
24 changed files with 257 additions and 33 deletions

View File

@ -266,6 +266,7 @@ namespace ErrorCodes
PARTITION_ALREADY_EXISTS, PARTITION_ALREADY_EXISTS,
PARTITION_DOESNT_EXIST, PARTITION_DOESNT_EXIST,
UNION_ALL_RESULT_STRUCTURES_MISMATCH, UNION_ALL_RESULT_STRUCTURES_MISMATCH,
UNION_ALL_COLUMN_ALIAS_MISMATCH,
CLIENT_OUTPUT_FORMAT_SPECIFIED, CLIENT_OUTPUT_FORMAT_SPECIFIED,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,

View File

@ -463,8 +463,6 @@ public:
ErrorCodes::ILLEGAL_COLUMN); ErrorCodes::ILLEGAL_COLUMN);
} }
}; };
class FunctionIPv4NumToString : public IFunction class FunctionIPv4NumToString : public IFunction
{ {
public: public:
@ -661,6 +659,115 @@ public:
}; };
class FunctionIPv4NumToStringClassC : public IFunction
{
public:
static constexpr auto name = "IPv4NumToStringClassC";
static IFunction * create(const Context & context) { return new FunctionIPv4NumToStringClassC; }
/// Получить имя функции.
String getName() const
{
return name;
}
/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnType(const DataTypes & arguments) const
{
if (arguments.size() != 1)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!typeid_cast<const DataTypeUInt32 *>(&*arguments[0]))
throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName() + ", expected UInt32",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return new DataTypeString;
}
static void formatIP(UInt32 ip, char *& out)
{
char * begin = out;
for (auto i = 0; i < 3; ++i)
*(out++) = 'x';
/// Запишем все задом наперед.
for (size_t offset = 8; offset <= 24; offset += 8)
{
if (offset > 0)
*(out++) = '.';
/// Достаем очередной байт.
UInt32 value = (ip >> offset) & static_cast<UInt32>(255);
/// Быстрее, чем sprintf.
if (value == 0)
{
*(out++) = '0';
}
else
{
while (value > 0)
{
*(out++) = '0' + value % 10;
value /= 10;
}
}
}
/// И развернем.
std::reverse(begin, out);
*(out++) = '\0';
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
const ColumnPtr column = block.getByPosition(arguments[0]).column;
if (const ColumnVector<UInt32> * col = typeid_cast<const ColumnVector<UInt32> *>(&*column))
{
const ColumnVector<UInt32>::Container_t & vec_in = col->getData();
ColumnString * col_res = new ColumnString;
block.getByPosition(result).column = col_res;
ColumnString::Chars_t & vec_res = col_res->getChars();
ColumnString::Offsets_t & offsets_res = col_res->getOffsets();
vec_res.resize(vec_in.size() * INET_ADDRSTRLEN); /// самое длинное значение: 255.255.255.255\0
offsets_res.resize(vec_in.size());
char * begin = reinterpret_cast<char *>(&vec_res[0]);
char * pos = begin;
for (size_t i = 0; i < vec_in.size(); ++i)
{
formatIP(vec_in[i], pos);
offsets_res[i] = pos - begin;
}
vec_res.resize(pos - begin);
}
else if (const ColumnConst<UInt32> * col = typeid_cast<const ColumnConst<UInt32> *>(&*column))
{
char buf[16];
char * pos = buf;
formatIP(col->getData(), pos);
ColumnConstString * col_res = new ColumnConstString(col->size(), buf);
block.getByPosition(result).column = col_res;
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};
class FunctionHex : public IFunction class FunctionHex : public IFunction
{ {
public: public:

View File

@ -77,7 +77,10 @@ public:
private: private:
typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr; typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr;
void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList()); void init(BlockInputStreamPtr input, const Names & required_column_names = Names(), const NamesAndTypesList & table_column_names = NamesAndTypesList());
void basic_init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names);
void init_union_all();
void init_query_analyzer();
/// Выполнить один запрос SELECT из цепочки UNION ALL. /// Выполнить один запрос SELECT из цепочки UNION ALL.
void executeSingleQuery(bool should_perform_union_hint = true); void executeSingleQuery(bool should_perform_union_hint = true);

View File

@ -52,6 +52,31 @@ public:
return false; return false;
} }
/// Переименовать столбцы запроса в такие же имена, как в исходном запросе.
void renameColumns(const ASTSelectQuery & source)
{
const ASTs & from = source.select_expression_list->children;
ASTs & to = select_expression_list->children;
if (from.size() != to.size())
throw Exception("Size mismatch in UNION ALL chain",
DB::ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
for (size_t i = 0; i < from.size(); ++i)
{
/// Если столбец имеет алиас, то он должен совпадать с названием исходного столбца.
/// В противном случае мы ему присваиваем алиас, если требуется.
if (!to[i]->tryGetAlias().empty())
{
if (to[i]->tryGetAlias() != from[i]->getAliasOrColumnName())
throw Exception("Column alias mismatch in UNION ALL chain",
DB::ErrorCodes::UNION_ALL_COLUMN_ALIAS_MISMATCH);
}
else if (to[i]->getColumnName() != from[i]->getAliasOrColumnName())
to[i]->setAlias(from[i]->getAliasOrColumnName());
}
}
/// Переписывает select_expression_list, чтобы вернуть только необходимые столбцы в правильном порядке. /// Переписывает select_expression_list, чтобы вернуть только необходимые столбцы в правильном порядке.
void rewriteSelectExpressionList(const Names & column_names) void rewriteSelectExpressionList(const Names & column_names)
{ {

View File

@ -825,7 +825,7 @@ private:
} }
if (has_vertical_output_suffix) if (has_vertical_output_suffix)
current_format = config().getString("format", "Vertical"); current_format = "Vertical";
block_std_out = context.getFormatFactory().getOutput(current_format, std_out, block); block_std_out = context.getFormatFactory().getOutput(current_format, std_out, block);
block_std_out->writePrefix(); block_std_out->writePrefix();

View File

@ -11,6 +11,7 @@ void registerFunctionsCoding(FunctionFactory & factory)
factory.registerFunction<FunctionIPv6StringToNum>(); factory.registerFunction<FunctionIPv6StringToNum>();
factory.registerFunction<FunctionIPv4NumToString>(); factory.registerFunction<FunctionIPv4NumToString>();
factory.registerFunction<FunctionIPv4StringToNum>(); factory.registerFunction<FunctionIPv4StringToNum>();
factory.registerFunction<FunctionIPv4NumToStringClassC>();
factory.registerFunction<FunctionHex>(); factory.registerFunction<FunctionHex>();
factory.registerFunction<FunctionUnhex>(); factory.registerFunction<FunctionUnhex>();
factory.registerFunction<FunctionBitmaskToArray>(); factory.registerFunction<FunctionBitmaskToArray>();

View File

@ -33,7 +33,30 @@
namespace DB namespace DB
{ {
void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndTypesList & table_column_names) void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & required_column_names, const NamesAndTypesList & table_column_names)
{
if (isFirstSelectInsideUnionAll())
{
/// Функция rewriteExpressionList() работает правильно только, если имена столбцов совпадают
/// для каждого запроса цепочки UNION ALL. Поэтому сначала выполняем инициализацию.
basic_init(input, table_column_names);
init_union_all();
if (!required_column_names.empty() && (context.getColumns().size() != required_column_names.size()))
{
rewriteExpressionList(required_column_names);
/// Теперь имеется устаревшая информация для выполнения запроса. Обновляем эту информацию.
init_query_analyzer();
}
}
else
{
if (!required_column_names.empty())
rewriteExpressionList(required_column_names);
basic_init(input, table_column_names);
}
}
void InterpreterSelectQuery::basic_init(BlockInputStreamPtr input_, const NamesAndTypesList & table_column_names)
{ {
ProfileEvents::increment(ProfileEvents::SelectQuery); ProfileEvents::increment(ProfileEvents::SelectQuery);
@ -85,9 +108,10 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType
if (input_) if (input_)
streams.push_back(input_); streams.push_back(input_);
}
if (isFirstSelectInsideUnionAll()) void InterpreterSelectQuery::init_union_all()
{ {
/// Создаем цепочку запросов SELECT и проверяем, что результаты всех запросов SELECT cовместимые. /// Создаем цепочку запросов SELECT и проверяем, что результаты всех запросов SELECT cовместимые.
/// NOTE Мы можем безопасно применить static_cast вместо typeid_cast, /// NOTE Мы можем безопасно применить static_cast вместо typeid_cast,
/// потому что знаем, что в цепочке UNION ALL имеются только деревья типа SELECT. /// потому что знаем, что в цепочке UNION ALL имеются только деревья типа SELECT.
@ -103,9 +127,22 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input_, const NamesAndType
+ "\n\nwhile expecting:\n\n" + first.dumpStructure() + "\n\ninstead", + "\n\nwhile expecting:\n\n" + first.dumpStructure() + "\n\ninstead",
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH); ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
} }
// Переименовать столбцы каждого запроса цепочки UNION ALL в такие же имена, как в первом запросе.
for (IAST * tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
{
auto & ast = static_cast<ASTSelectQuery &>(*tree);
ast.renameColumns(query);
} }
} }
void InterpreterSelectQuery::init_query_analyzer()
{
query_analyzer = new ExpressionAnalyzer(query_ptr, context, storage, subquery_depth, true);
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
p->query_analyzer = new ExpressionAnalyzer(p->query_ptr, p->context, p->storage, p->subquery_depth, true);
}
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_, InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_) size_t subquery_depth_, BlockInputStreamPtr input_, bool is_union_all_head_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)), : query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
@ -124,8 +161,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
is_union_all_head(true), is_union_all_head(true),
log(&Logger::get("InterpreterSelectQuery")) log(&Logger::get("InterpreterSelectQuery"))
{ {
rewriteExpressionList(required_column_names_); init(input_, required_column_names_);
init(input_);
} }
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_,
@ -136,8 +172,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
is_union_all_head(true), is_union_all_head(true),
log(&Logger::get("InterpreterSelectQuery")) log(&Logger::get("InterpreterSelectQuery"))
{ {
rewriteExpressionList(required_column_names_); init(input_, required_column_names_, table_column_names);
init(input_, table_column_names);
} }
void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column_names) void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column_names)
@ -147,16 +182,15 @@ void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column
for (IAST* tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get()) for (IAST* tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
{ {
auto & next_query = *(static_cast<ASTSelectQuery *>(tree)); auto & next_query = static_cast<ASTSelectQuery &>(*tree);
if (next_query.distinct) if (next_query.distinct)
return; return;
} }
query.rewriteSelectExpressionList(required_column_names); query.rewriteSelectExpressionList(required_column_names);
for (IAST* tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get()) for (IAST* tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
{ {
auto & next_query = *(static_cast<ASTSelectQuery *>(tree)); auto & next_query = static_cast<ASTSelectQuery &>(*tree);
next_query.rewriteSelectExpressionList(required_column_names); next_query.rewriteSelectExpressionList(required_column_names);
} }
} }

View File

@ -1,8 +1,10 @@
DROP TABLE IF EXISTS data2013; DROP TABLE IF EXISTS data2013;
DROP TABLE IF EXISTS data2014; DROP TABLE IF EXISTS data2014;
DROP TABLE IF EXISTS data2015;
CREATE TABLE data2013 (name String, value UInt32) ENGINE = Memory; CREATE TABLE data2013 (name String, value UInt32) ENGINE = Memory;
CREATE TABLE data2014 (name String, value UInt32) ENGINE = Memory; CREATE TABLE data2014 (name String, value UInt32) ENGINE = Memory;
CREATE TABLE data2015 (data_name String, data_value UInt32) ENGINE = Memory;
INSERT INTO data2013(name,value) VALUES('Alice', 1000); INSERT INTO data2013(name,value) VALUES('Alice', 1000);
INSERT INTO data2013(name,value) VALUES('Bob', 2000); INSERT INTO data2013(name,value) VALUES('Bob', 2000);
@ -12,6 +14,9 @@ INSERT INTO data2014(name,value) VALUES('Alice', 2000);
INSERT INTO data2014(name,value) VALUES('Bob', 2000); INSERT INTO data2014(name,value) VALUES('Bob', 2000);
INSERT INTO data2014(name,value) VALUES('Dennis', 35000); INSERT INTO data2014(name,value) VALUES('Dennis', 35000);
INSERT INTO data2015(data_name, data_value) VALUES('Foo', 42);
INSERT INTO data2015(data_name, data_value) VALUES('Bar', 1);
SELECT val FROM SELECT val FROM
(SELECT value AS val FROM data2013 WHERE name = 'Alice' (SELECT value AS val FROM data2013 WHERE name = 'Alice'
UNION ALL UNION ALL

View File

@ -0,0 +1,2 @@
1
2

View File

@ -0,0 +1 @@
SELECT * FROM (SELECT 1 UNION ALL SELECT 2) ORDER BY 1 ASC;

View File

@ -0,0 +1,2 @@
1
2

View File

@ -0,0 +1 @@
SELECT * FROM (SELECT 1 AS X UNION ALL SELECT 2) ORDER BY X ASC;

View File

@ -0,0 +1,3 @@
1
2
3

View File

@ -0,0 +1 @@
SELECT * FROM (SELECT 1 AS X UNION ALL SELECT 2 UNION ALL SELECT 3 AS X) ORDER BY X ASC;

View File

@ -0,0 +1,11 @@
1
2
3
4
5
6
7
8
9
10
12345678902

View File

@ -0,0 +1 @@
SELECT X + 1 FROM (SELECT 12345678901 AS X UNION ALL SELECT number FROM system.numbers LIMIT 10) ORDER BY X ASC;

View File

@ -0,0 +1,5 @@
Alice
Bar
Bob
Carol
Foo

View File

@ -0,0 +1 @@
SELECT name FROM (SELECT name FROM data2013 UNION ALL SELECT data_name FROM data2015) ORDER BY name ASC;

View File

@ -0,0 +1,5 @@
Alice
Bar
Bob
Carol
Foo

View File

@ -0,0 +1 @@
SELECT X FROM (SELECT name AS X FROM data2013 UNION ALL SELECT data_name FROM data2015) ORDER BY X ASC;

View File

@ -0,0 +1,5 @@
Alice
Bar
Bob
Carol
Foo

View File

@ -0,0 +1 @@
SELECT name FROM (SELECT name FROM data2013 UNION ALL SELECT data_name AS name FROM data2015) ORDER BY name ASC;

View File

@ -0,0 +1,4 @@
1
1
1
1

View File

@ -0,0 +1,4 @@
select IPv4NumToStringClassC(toUInt32(0)) = '0.0.0.xxx';
select IPv4NumToStringClassC(0x7f000001) = '127.0.0.xxx';
select sum(IPv4NumToStringClassC(materialize(toUInt32(0))) = '0.0.0.xxx') = count() array join range(1024) as n;
select sum(IPv4NumToStringClassC(materialize(0x7f000001)) = '127.0.0.xxx') = count() array join range(1024) as n;