mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 01:30:51 +00:00
dbms: clearing temporary columns while executing query - improved performance (not tested) [#CONV-2944].
This commit is contained in:
parent
aad8bab2d0
commit
fa0dc68533
@ -18,11 +18,12 @@ using Poco::SharedPtr;
|
||||
* Выражение не меняет количество строк в потоке, и обрабатывает каждую строку независимо от других.
|
||||
* part_id - идентификатор части выражения, которую надо вычислять.
|
||||
* Например, может потребоваться вычислить только часть выражения в секции WHERE.
|
||||
* clear_temporaries - удалить временные столбцы из блока, которые больше не понадобятся ни для каких вычислений.
|
||||
*/
|
||||
class ExpressionBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
ExpressionBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression_, unsigned part_id_ = 0)
|
||||
ExpressionBlockInputStream(BlockInputStreamPtr input_, ExpressionPtr expression_, unsigned part_id_ = 0, bool clear_temporaries_ = false)
|
||||
: input(input_), expression(expression_), part_id(part_id_)
|
||||
{
|
||||
children.push_back(input);
|
||||
@ -40,6 +41,10 @@ protected:
|
||||
return res;
|
||||
|
||||
expression->execute(res, part_id);
|
||||
|
||||
if (clear_temporaries)
|
||||
expression->clearTemporaries(res);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -47,6 +52,7 @@ private:
|
||||
BlockInputStreamPtr input;
|
||||
ExpressionPtr expression;
|
||||
unsigned part_id;
|
||||
bool clear_temporaries;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -47,9 +47,14 @@ public:
|
||||
* Функция добавляет в блок новые столбцы - результаты вычислений.
|
||||
* part_id - какую часть выражения вычислять.
|
||||
* Если указано only_consts - то вычисляются только выражения, зависящие от констант.
|
||||
* Если указано clear_temporaries - удалить временные столбцы из блока, которые больше не понадобятся ни для каких вычислений.
|
||||
*/
|
||||
void execute(Block & block, unsigned part_id = 0, bool only_consts = false);
|
||||
|
||||
/** Убрать из блока столбцы, которые больше не нужны.
|
||||
*/
|
||||
void clearTemporaries(Block & block);
|
||||
|
||||
/** Взять из блока с промежуточными результатами вычислений только столбцы, представляющие собой конечный результат.
|
||||
* Переименовать их в алиасы, если они заданы и если параметр without_duplicates_and_aliases = false.
|
||||
* Вернуть новый блок, в котором эти столбцы расположены в правильном порядке.
|
||||
@ -144,6 +149,12 @@ private:
|
||||
|
||||
void markBeforeArrayJoinImpl(ASTPtr ast, unsigned part_id, bool below = false);
|
||||
|
||||
typedef std::set<std::string> NeedColumns;
|
||||
|
||||
void clearTemporariesImpl(ASTPtr ast, Block & block);
|
||||
|
||||
void collectNeedColumns(ASTPtr ast, Block & block, NeedColumns & need_columns, bool top_level = true, bool all_children_need = false);
|
||||
|
||||
/// Получить тип у функции, идентификатора или литерала.
|
||||
DataTypePtr getType(ASTPtr ast);
|
||||
};
|
||||
|
@ -226,13 +226,81 @@ void Expression::execute(Block & block, unsigned part_id, bool only_consts)
|
||||
}
|
||||
|
||||
|
||||
void Expression::clearTemporaries(Block & block)
|
||||
{
|
||||
clearTemporariesImpl(ast, block);
|
||||
}
|
||||
|
||||
|
||||
void Expression::clearTemporariesImpl(ASTPtr ast, Block & block)
|
||||
{
|
||||
/** Очистку ненужных столбцов будем делать так:
|
||||
* - будем собирать нужные столбцы:
|
||||
* - пройдём по выражению;
|
||||
* - корневые столбцы считаем нужными;
|
||||
* - если столбца ещё нет в блоке, то считаем его нужным, и всех его детей тоже.
|
||||
*/
|
||||
|
||||
NeedColumns need_columns;
|
||||
collectNeedColumns(ast, block, need_columns);
|
||||
|
||||
Block cleared_block;
|
||||
|
||||
for (size_t i = 0, columns = block.columns(); i < columns; ++i)
|
||||
if (need_columns.end() != need_columns.find(block.getByPosition(i).name))
|
||||
cleared_block.insert(block.getByPosition(i));
|
||||
|
||||
block = cleared_block;
|
||||
}
|
||||
|
||||
|
||||
void Expression::collectNeedColumns(ASTPtr ast, Block & block, NeedColumns & need_columns, bool top_level, bool all_children_need)
|
||||
{
|
||||
bool is_column =
|
||||
dynamic_cast<ASTIdentifier *>(&*ast)
|
||||
|| dynamic_cast<ASTLiteral *>(&*ast)
|
||||
|| dynamic_cast<ASTFunction *>(&*ast)
|
||||
|| dynamic_cast<ASTSet *>(&*ast);
|
||||
|
||||
bool need_this_column = false;
|
||||
|
||||
if (is_column)
|
||||
{
|
||||
if (all_children_need)
|
||||
{
|
||||
need_this_column = true;
|
||||
}
|
||||
else if (!block.has(ast->getColumnName()))
|
||||
{
|
||||
/// Если столбца ещё нет в блоке, то считаем его нужным, и всех его детей тоже.
|
||||
need_this_column = true;
|
||||
all_children_need = true;
|
||||
}
|
||||
else if (top_level)
|
||||
{
|
||||
/// Корневые столбцы считаем нужными.
|
||||
need_this_column = true;
|
||||
top_level = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (need_this_column)
|
||||
need_columns.insert(ast->getColumnName());
|
||||
|
||||
/// Обход сверху-вниз. Не опускаемся в подзапросы.
|
||||
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
|
||||
if (!dynamic_cast<ASTSelectQuery *>(&**it))
|
||||
collectNeedColumns(*it, block, need_columns, top_level, all_children_need);
|
||||
}
|
||||
|
||||
|
||||
void Expression::executeImpl(ASTPtr ast, Block & block, unsigned part_id, bool only_consts)
|
||||
{
|
||||
/// Если результат вычисления уже есть в блоке.
|
||||
if ((dynamic_cast<ASTFunction *>(&*ast) || dynamic_cast<ASTLiteral *>(&*ast)) && block.has(ast->getColumnName()))
|
||||
return;
|
||||
|
||||
/// Обход в глубину. Не опускаемся в подзапросы.
|
||||
/// Обход снизу-вверх. Не опускаемся в подзапросы.
|
||||
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
|
||||
if (!dynamic_cast<ASTSelectQuery *>(&**it))
|
||||
executeImpl(*it, block, part_id, only_consts);
|
||||
|
@ -272,8 +272,7 @@ void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, Expressio
|
||||
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||
{
|
||||
BlockInputStreamPtr & stream = *it;
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_WHERE), is_async);
|
||||
// TODO: Убрать лишние столбцы
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_WHERE, true), is_async);
|
||||
stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.where_expression->getColumnName()), is_async);
|
||||
}
|
||||
}
|
||||
@ -292,7 +291,7 @@ void InterpreterSelectQuery::executeArrayJoin(BlockInputStreams & streams, Expre
|
||||
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||
{
|
||||
BlockInputStreamPtr & stream = *it;
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_BEFORE_ARRAY_JOIN), is_async);
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_BEFORE_ARRAY_JOIN, true), is_async);
|
||||
stream = maybeAsynchronous(new ArrayJoiningBlockInputStream(stream, array_join_column_name), is_async);
|
||||
}
|
||||
}
|
||||
@ -310,7 +309,7 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
|
||||
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||
{
|
||||
BlockInputStreamPtr & stream = *it;
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING), is_async);
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING, true), is_async);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr & stream = streams[0];
|
||||
@ -360,7 +359,7 @@ void InterpreterSelectQuery::executeHaving(BlockInputStreams & streams, Expressi
|
||||
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||
{
|
||||
BlockInputStreamPtr & stream = *it;
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_HAVING), is_async);
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_HAVING, true), is_async);
|
||||
stream = maybeAsynchronous(new FilterBlockInputStream(stream, query.having_expression->getColumnName()), is_async);
|
||||
}
|
||||
}
|
||||
@ -378,7 +377,7 @@ void InterpreterSelectQuery::executeOuterExpression(BlockInputStreams & streams,
|
||||
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||
{
|
||||
BlockInputStreamPtr & stream = *it;
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER), is_async);
|
||||
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER, true), is_async);
|
||||
|
||||
/** Оставим только столбцы, нужные для SELECT и ORDER BY части.
|
||||
* Если нет ORDER BY - то это последняя проекция, и нужно брать только столбцы из SELECT части.
|
||||
|
Loading…
Reference in New Issue
Block a user