mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 09:40:49 +00:00
virtual columns: fixed bug with Group by virtual column, bug when column appears in result more than once [METR-9172]
This commit is contained in:
parent
53929040b0
commit
655e0d4934
@ -20,163 +20,23 @@ namespace VirtualColumnUtils
|
||||
{
|
||||
|
||||
/// Вычислить минимальный числовый суффикс, который надо добавить к строке, чтобы она не присутствовала в множестве
|
||||
inline String chooseSuffix(const NamesAndTypesList & columns, const String & name)
|
||||
{
|
||||
int id = 0;
|
||||
String current_suffix;
|
||||
while (true)
|
||||
{
|
||||
bool done = true;
|
||||
for (const auto & it : columns)
|
||||
if (it.first == name + current_suffix)
|
||||
{
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
if (done) break;
|
||||
++id;
|
||||
current_suffix = toString<Int32>(id);
|
||||
}
|
||||
return current_suffix;
|
||||
}
|
||||
String chooseSuffix(const NamesAndTypesList & columns, const String & name);
|
||||
|
||||
/// Вычислить минимальный общий числовый суффикс, который надо добавить к каждой строке,
|
||||
/// чтобы ни одна не присутствовала в множестве.
|
||||
inline String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<String> & names)
|
||||
{
|
||||
int id = 0;
|
||||
String current_suffix;
|
||||
while (true)
|
||||
{
|
||||
bool done = true;
|
||||
for (const auto & it : columns)
|
||||
{
|
||||
for (size_t i = 0; i < names.size(); ++i)
|
||||
{
|
||||
if (it.first == names[i] + current_suffix)
|
||||
{
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!done)
|
||||
break;
|
||||
}
|
||||
if (done)
|
||||
break;
|
||||
id ++;
|
||||
current_suffix = toString<Int32>(id);
|
||||
}
|
||||
return current_suffix;
|
||||
}
|
||||
String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<String> & names);
|
||||
|
||||
/// Добавляет в селект запрос секцию select column_name as value
|
||||
/// Например select _port as 9000.
|
||||
inline void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value)
|
||||
{
|
||||
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*ast);
|
||||
ASTExpressionList & node = dynamic_cast<ASTExpressionList &>(*select.select_expression_list);
|
||||
ASTs & asts = node.children;
|
||||
ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value);
|
||||
cur->alias = column_name;
|
||||
ASTPtr column_value = cur;
|
||||
asts.insert(asts.begin(), column_value);
|
||||
}
|
||||
|
||||
/// Проверка, что функция зависит только от заданных столбцов
|
||||
inline bool isValidFunction(ASTPtr expression, const std::vector<String> & columns)
|
||||
{
|
||||
for (size_t i = 0; i < expression->children.size(); ++i)
|
||||
if (!isValidFunction(expression->children[i], columns))
|
||||
return false;
|
||||
|
||||
if (const ASTIdentifier * identifier = dynamic_cast<const ASTIdentifier *>(&* expression))
|
||||
{
|
||||
if (identifier->kind == ASTIdentifier::Kind::Column)
|
||||
{
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
if (columns[i] == identifier->name)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Извлечь все подфункции главной конъюнкции, но зависящие только от заданных столбцов
|
||||
inline void extractFunctions(ASTPtr expression, const std::vector<String> & columns, std::vector<ASTPtr> & result)
|
||||
{
|
||||
if (const ASTFunction * function = dynamic_cast<const ASTFunction *>(&* expression))
|
||||
{
|
||||
if (function->name == "and")
|
||||
{
|
||||
for (size_t i = 0; i < function->arguments->children.size(); ++i)
|
||||
extractFunctions(function->arguments->children[i], columns, result);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (isValidFunction(expression, columns))
|
||||
result.push_back(expression->clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Построить конъюнкцию из заданных функций
|
||||
inline ASTPtr buildWhereExpression(const ASTs & functions)
|
||||
{
|
||||
if (functions.size() == 0) return NULL;
|
||||
if (functions.size() == 1) return functions[0];
|
||||
ASTPtr new_query = new ASTFunction();
|
||||
ASTFunction & new_function = dynamic_cast<ASTFunction & >(*new_query);
|
||||
new_function.name = "and";
|
||||
new_function.arguments = new ASTExpressionList();
|
||||
new_function.arguments->children = functions;
|
||||
new_function.children.push_back(new_function.arguments);
|
||||
return new_query;
|
||||
}
|
||||
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value);
|
||||
|
||||
/// Получить поток блоков содержащий интересующие нас значения виртуальных столбцов
|
||||
/// На вход подается исходный запрос, блок с значениями виртуальных столбцов и контекст
|
||||
inline BlockInputStreamPtr getVirtualColumnsBlocks(ASTPtr query, const Block & input, const Context & context)
|
||||
{
|
||||
const ASTSelectQuery & select = dynamic_cast<ASTSelectQuery & >(*query);
|
||||
if (!select.where_expression)
|
||||
return new OneBlockInputStream(input);
|
||||
|
||||
ASTPtr new_query = new ASTSelectQuery();
|
||||
|
||||
/// Вычисляем имена виртуальных столбцов
|
||||
std::vector<String> columns;
|
||||
for (const auto & it : input.getColumnsList())
|
||||
columns.push_back(it.first);
|
||||
|
||||
/// Формируем запрос и вычисляем имена виртуальных столбцов
|
||||
ASTSelectQuery & new_select = dynamic_cast<ASTSelectQuery & >(*new_query);
|
||||
|
||||
new_select.select_expression_list = new ASTExpressionList();
|
||||
ASTExpressionList & select_list = dynamic_cast<ASTExpressionList & >(*new_select.select_expression_list);
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
select_list.children.push_back(new ASTIdentifier(StringRange(NULL, NULL), columns[i]));
|
||||
|
||||
std::vector<ASTPtr> functions;
|
||||
extractFunctions(select.where_expression, columns, functions);
|
||||
new_select.where_expression = buildWhereExpression(functions);
|
||||
|
||||
if (new_select.select_expression_list)
|
||||
new_select.children.push_back(new_select.select_expression_list);
|
||||
if (new_select.where_expression)
|
||||
new_select.children.push_back(new_select.where_expression);
|
||||
|
||||
/// Возвращаем результат выолнения нового запроса на блоке виртуальных фукнций
|
||||
InterpreterSelectQuery interpreter(new_query, context, columns, input.getColumnsList(),
|
||||
QueryProcessingStage::Complete, 0, new OneBlockInputStream(input));
|
||||
|
||||
return interpreter.execute();
|
||||
}
|
||||
BlockInputStreamPtr getVirtualColumnsBlocks(ASTPtr query, const Block & input, const Context & context);
|
||||
|
||||
/// Извлечь из входного потока множество значений столбца name
|
||||
template<typename T1>
|
||||
inline std::set<T1> extractSingleValueFromBlocks(BlockInputStreamPtr input, const String & name)
|
||||
std::set<T1> extractSingleValueFromBlocks(BlockInputStreamPtr input, const String & name)
|
||||
{
|
||||
std::set<T1> res;
|
||||
input->readPrefix();
|
||||
@ -193,8 +53,8 @@ inline std::set<T1> extractSingleValueFromBlocks(BlockInputStreamPtr input, cons
|
||||
|
||||
/// Извлечь из входного потока множество пар значений в столбцах first_name и second_name
|
||||
template<typename T1, typename T2>
|
||||
inline std::set< std::pair<T1, T2> > extractTwoValuesFromBlocks(BlockInputStreamPtr input,
|
||||
const String & first_name, const String & second_name)
|
||||
std::set< std::pair<T1, T2> > extractTwoValuesFromBlocks(BlockInputStreamPtr input,
|
||||
const String & first_name, const String & second_name)
|
||||
{
|
||||
std::set< std::pair<T1, T2> > res;
|
||||
input->readPrefix();
|
||||
@ -214,6 +74,5 @@ inline std::set< std::pair<T1, T2> > extractTwoValuesFromBlocks(BlockInputStream
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
186
dbms/src/Common/VirtualColumnUtils.cpp
Normal file
186
dbms/src/Common/VirtualColumnUtils.cpp
Normal file
@ -0,0 +1,186 @@
|
||||
#include <DB/Common/VirtualColumnUtils.h>
|
||||
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/DataStreams/AddingConstColumnBlockInputStream.h>
|
||||
#include <DB/DataStreams/OneBlockInputStream.h>
|
||||
#include <DB/DataTypes/DataTypeString.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
#include <DB/Parsers/ASTIdentifier.h>
|
||||
#include <DB/Parsers/ASTExpressionList.h>
|
||||
#include <DB/Parsers/ASTLiteral.h>
|
||||
#include <DB/Parsers/ASTSelectQuery.h>
|
||||
#include <DB/Storages/StoragePtr.h>
|
||||
#include <DB/Interpreters/InterpreterSelectQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace VirtualColumnUtils
|
||||
{
|
||||
|
||||
String chooseSuffix(const NamesAndTypesList & columns, const String & name)
|
||||
{
|
||||
int id = 0;
|
||||
String current_suffix;
|
||||
while (true)
|
||||
{
|
||||
bool done = true;
|
||||
for (const auto & it : columns)
|
||||
if (it.first == name + current_suffix)
|
||||
{
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
if (done) break;
|
||||
++id;
|
||||
current_suffix = toString<Int32>(id);
|
||||
}
|
||||
return current_suffix;
|
||||
}
|
||||
|
||||
String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<String> & names)
|
||||
{
|
||||
int id = 0;
|
||||
String current_suffix;
|
||||
while (true)
|
||||
{
|
||||
bool done = true;
|
||||
for (const auto & it : columns)
|
||||
{
|
||||
for (size_t i = 0; i < names.size(); ++i)
|
||||
{
|
||||
if (it.first == names[i] + current_suffix)
|
||||
{
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!done)
|
||||
break;
|
||||
}
|
||||
if (done)
|
||||
break;
|
||||
id ++;
|
||||
current_suffix = toString<Int32>(id);
|
||||
}
|
||||
return current_suffix;
|
||||
}
|
||||
|
||||
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value)
|
||||
{
|
||||
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*ast);
|
||||
ASTExpressionList & node = dynamic_cast<ASTExpressionList &>(*select.select_expression_list);
|
||||
ASTs & asts = node.children;
|
||||
ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value);
|
||||
cur->alias = column_name;
|
||||
ASTPtr column_value = cur;
|
||||
bool is_replaced = false;
|
||||
for (size_t i = 0; i < asts.size(); ++i)
|
||||
{
|
||||
if (const ASTIdentifier * identifier = dynamic_cast<const ASTIdentifier *>(&* asts[i]))
|
||||
{
|
||||
if (identifier->kind == ASTIdentifier::Kind::Column && identifier->name == column_name)
|
||||
{
|
||||
asts[i] = column_value;
|
||||
is_replaced = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!is_replaced)
|
||||
asts.insert(asts.begin(), column_value);
|
||||
}
|
||||
|
||||
/// Проверка, что функция зависит только от заданных столбцов
|
||||
static bool isValidFunction(ASTPtr expression, const std::vector<String> & columns)
|
||||
{
|
||||
for (size_t i = 0; i < expression->children.size(); ++i)
|
||||
if (!isValidFunction(expression->children[i], columns))
|
||||
return false;
|
||||
|
||||
if (const ASTIdentifier * identifier = dynamic_cast<const ASTIdentifier *>(&* expression))
|
||||
{
|
||||
if (identifier->kind == ASTIdentifier::Kind::Column)
|
||||
{
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
if (columns[i] == identifier->name)
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Извлечь все подфункции главной конъюнкции, но зависящие только от заданных столбцов
|
||||
static void extractFunctions(ASTPtr expression, const std::vector<String> & columns, std::vector<ASTPtr> & result)
|
||||
{
|
||||
if (const ASTFunction * function = dynamic_cast<const ASTFunction *>(&* expression))
|
||||
{
|
||||
if (function->name == "and")
|
||||
{
|
||||
for (size_t i = 0; i < function->arguments->children.size(); ++i)
|
||||
extractFunctions(function->arguments->children[i], columns, result);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (isValidFunction(expression, columns))
|
||||
result.push_back(expression->clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Построить конъюнкцию из заданных функций
|
||||
static ASTPtr buildWhereExpression(const ASTs & functions)
|
||||
{
|
||||
if (functions.size() == 0) return NULL;
|
||||
if (functions.size() == 1) return functions[0];
|
||||
ASTPtr new_query = new ASTFunction();
|
||||
ASTFunction & new_function = dynamic_cast<ASTFunction & >(*new_query);
|
||||
new_function.name = "and";
|
||||
new_function.arguments = new ASTExpressionList();
|
||||
new_function.arguments->children = functions;
|
||||
new_function.children.push_back(new_function.arguments);
|
||||
return new_query;
|
||||
}
|
||||
|
||||
BlockInputStreamPtr getVirtualColumnsBlocks(ASTPtr query, const Block & input, const Context & context)
|
||||
{
|
||||
const ASTSelectQuery & select = dynamic_cast<ASTSelectQuery & >(*query);
|
||||
if (!select.where_expression)
|
||||
return new OneBlockInputStream(input);
|
||||
|
||||
ASTPtr new_query = new ASTSelectQuery();
|
||||
|
||||
/// Вычисляем имена виртуальных столбцов
|
||||
std::vector<String> columns;
|
||||
for (const auto & it : input.getColumnsList())
|
||||
columns.push_back(it.first);
|
||||
|
||||
/// Формируем запрос и записываем имена виртуальных столбцов
|
||||
ASTSelectQuery & new_select = dynamic_cast<ASTSelectQuery & >(*new_query);
|
||||
|
||||
new_select.select_expression_list = new ASTExpressionList();
|
||||
ASTExpressionList & select_list = dynamic_cast<ASTExpressionList & >(*new_select.select_expression_list);
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
select_list.children.push_back(new ASTIdentifier(StringRange(NULL, NULL), columns[i]));
|
||||
|
||||
std::vector<ASTPtr> functions;
|
||||
extractFunctions(select.where_expression, columns, functions);
|
||||
new_select.where_expression = buildWhereExpression(functions);
|
||||
|
||||
if (new_select.select_expression_list)
|
||||
new_select.children.push_back(new_select.select_expression_list);
|
||||
if (new_select.where_expression)
|
||||
new_select.children.push_back(new_select.where_expression);
|
||||
|
||||
/// Возвращаем результат выполнения нового запроса на блоке виртуальных функций
|
||||
InterpreterSelectQuery interpreter(new_query, context, columns, input.getColumnsList(),
|
||||
QueryProcessingStage::Complete, 0, new OneBlockInputStream(input));
|
||||
|
||||
return interpreter.execute();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
@ -116,9 +116,21 @@ BlockInputStreams StorageChunkMerger::read(
|
||||
processed_stage = QueryProcessingStage::Complete;
|
||||
QueryProcessingStage::Enum tmp_processed_stage = QueryProcessingStage::Complete;
|
||||
|
||||
bool has_virtual_column = false;
|
||||
|
||||
for (const auto & column : column_names)
|
||||
if (column == _table_column_name)
|
||||
has_virtual_column = true;
|
||||
|
||||
Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables);
|
||||
BlockInputStreamPtr virtual_columns =
|
||||
VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
|
||||
BlockInputStreamPtr virtual_columns;
|
||||
|
||||
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
|
||||
if (has_virtual_column)
|
||||
virtual_columns = VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
|
||||
else /// Иначе, считаем допустимыми все возможные значения
|
||||
virtual_columns = new OneBlockInputStream(virtual_columns_block);
|
||||
|
||||
std::set<String> values = VirtualColumnUtils::extractSingleValueFromBlocks<String>(virtual_columns, _table_column_name);
|
||||
bool all_inclusive = (values.size() == virtual_columns_block.rows());
|
||||
|
||||
@ -139,9 +151,15 @@ BlockInputStreams StorageChunkMerger::read(
|
||||
if (real_column_names.size() == 0)
|
||||
real_column_names.push_back(ExpressionActions::getSmallestColumn((*it)->getColumnsList()));
|
||||
|
||||
ASTPtr modified_query_ast = query->clone();
|
||||
|
||||
/// Подменяем виртуальный столбец на его значение
|
||||
if (!virt_column_names.empty())
|
||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _table_column_name, (*it)->getTableName());
|
||||
|
||||
BlockInputStreams source_streams = (*it)->read(
|
||||
real_column_names,
|
||||
query,
|
||||
modified_query_ast,
|
||||
settings,
|
||||
tmp_processed_stage,
|
||||
max_block_size,
|
||||
|
@ -44,6 +44,16 @@ BlockInputStreams StorageChunks::read(
|
||||
size_t max_block_size,
|
||||
unsigned threads)
|
||||
{
|
||||
bool has_virtual_column = false;
|
||||
|
||||
for (const auto & column : column_names)
|
||||
if (column == _table_column_name)
|
||||
has_virtual_column = true;
|
||||
|
||||
/// Если виртуальных столбцов нет, просто считать данные из таблицы
|
||||
if (!has_virtual_column)
|
||||
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
|
||||
|
||||
Block virtual_columns_block = getBlockWithVirtualColumns();
|
||||
BlockInputStreamPtr virtual_columns =
|
||||
VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
|
||||
|
@ -167,14 +167,20 @@ BlockInputStreams StorageDistributed::read(
|
||||
}
|
||||
|
||||
Names columns_to_remove;
|
||||
if (!select_host_column)
|
||||
if (!select_host_column && need_host_column)
|
||||
columns_to_remove.push_back(_host_column_name);
|
||||
if (!select_port_column)
|
||||
if (!select_port_column && need_port_column)
|
||||
columns_to_remove.push_back(_port_column_name);
|
||||
|
||||
Block virtual_columns_block = getBlockWithVirtualColumns();
|
||||
BlockInputStreamPtr virtual_columns =
|
||||
VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
|
||||
BlockInputStreamPtr virtual_columns;
|
||||
|
||||
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
|
||||
if (need_host_column || need_port_column)
|
||||
virtual_columns = VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
|
||||
else /// Иначе, считаем допустимыми все возможные значения
|
||||
virtual_columns = new OneBlockInputStream(virtual_columns_block);
|
||||
|
||||
std::set< std::pair<String, UInt16> > values =
|
||||
VirtualColumnUtils::extractTwoValuesFromBlocks<String, UInt16>(virtual_columns, _host_column_name, _port_column_name);
|
||||
bool all_inclusive = values.size() == virtual_columns_block.rows();
|
||||
|
@ -73,8 +73,14 @@ BlockInputStreams StorageMerge::read(
|
||||
}
|
||||
|
||||
Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables);
|
||||
BlockInputStreamPtr virtual_columns =
|
||||
VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
|
||||
BlockInputStreamPtr virtual_columns;
|
||||
|
||||
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
|
||||
if (!virt_column_names.empty())
|
||||
virtual_columns = VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
|
||||
else /// Иначе, считаем допустимыми все возможные значения
|
||||
virtual_columns = new OneBlockInputStream(virtual_columns_block);
|
||||
|
||||
std::set<String> values = VirtualColumnUtils::extractSingleValueFromBlocks<String>(virtual_columns, _table_column_name);
|
||||
bool all_inclusive = (values.size() == virtual_columns_block.rows());
|
||||
|
||||
@ -87,9 +93,13 @@ BlockInputStreams StorageMerge::read(
|
||||
if (real_column_names.size() == 0)
|
||||
real_column_names.push_back(ExpressionActions::getSmallestColumn((*it)->getColumnsList()));
|
||||
|
||||
/// Подменяем виртуальный столбец на его значение
|
||||
ASTPtr modified_query_ast = query->clone();
|
||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _table_column_name, (*it)->getTableName());
|
||||
|
||||
BlockInputStreams source_streams = (*it)->read(
|
||||
real_column_names,
|
||||
query,
|
||||
modified_query_ast,
|
||||
settings,
|
||||
tmp_processed_stage,
|
||||
max_block_size,
|
||||
|
Loading…
Reference in New Issue
Block a user