dbms: development [#CONV-5097].

This commit is contained in:
Alexey Milovidov 2012-06-21 19:51:20 +00:00
parent f6b6f488b3
commit b4a40177ec

View File

@ -234,11 +234,12 @@ void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, Expressio
{
setPartID(query.where_expression, PART_WHERE);
bool is_async = context.settings.asynchronous && streams.size() <= context.settings.max_threads;
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_WHERE), context.settings.asynchronous);
stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous);
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_WHERE), is_async);
stream = maybeAsynchronous(new FilterBlockInputStream(stream), is_async);
}
}
}
@ -251,10 +252,11 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
if (query.group_expression_list)
setPartID(query.group_expression_list, PART_GROUP);
bool is_async = context.settings.asynchronous && streams.size() <= context.settings.max_threads;
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING), context.settings.asynchronous);
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING), is_async);
}
BlockInputStreamPtr & stream = streams[0];
@ -300,11 +302,12 @@ void InterpreterSelectQuery::executeHaving(BlockInputStreams & streams, Expressi
{
setPartID(query.having_expression, PART_HAVING);
bool is_async = context.settings.asynchronous && streams.size() <= context.settings.max_threads;
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_HAVING), context.settings.asynchronous);
stream = maybeAsynchronous(new FilterBlockInputStream(stream), context.settings.asynchronous);
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_HAVING), is_async);
stream = maybeAsynchronous(new FilterBlockInputStream(stream), is_async);
}
}
}
@ -317,10 +320,11 @@ void InterpreterSelectQuery::executeOuterExpression(BlockInputStreams & streams,
if (query.order_expression_list)
setPartID(query.order_expression_list, PART_ORDER);
bool is_async = context.settings.asynchronous && streams.size() <= context.settings.max_threads;
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER), context.settings.asynchronous);
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER), is_async);
/** Оставим только столбцы, нужные для SELECT и ORDER BY части.
* Если нет ORDER BY - то это последняя проекция, и нужно брать только столбцы из SELECT части.
@ -348,10 +352,11 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams, Expressio
order_descr.push_back(SortColumnDescription(name, dynamic_cast<ASTOrderByElement &>(**it).direction));
}
bool is_async = context.settings.asynchronous && streams.size() <= context.settings.max_threads;
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new PartialSortingBlockInputStream(stream, order_descr), context.settings.asynchronous);
stream = maybeAsynchronous(new PartialSortingBlockInputStream(stream, order_descr), is_async);
}
BlockInputStreamPtr & stream = streams[0];