dbms: Server: execute queries from views with as most parallelism as possible [#METR-14408]

This commit is contained in:
Alexey Arno 2014-12-19 18:56:12 +03:00
parent ef3c9190e7
commit 5d23310cea
3 changed files with 32 additions and 29 deletions

View File

@ -63,6 +63,9 @@ public:
*/
BlockInputStreamPtr execute();
/// Выполнить запрос без объединения потоков.
BlockInputStreams executeWithoutUnion();
/** Выполнить запрос, записать результат в нужном формате в buf.
* BlockInputStreamPtr возвращается, чтобы можно было потом получить информацию о плане выполнения запроса.
*/
@ -75,7 +78,7 @@ private:
typedef Poco::SharedPtr<ExpressionAnalyzer> ExpressionAnalyzerPtr;
void init(BlockInputStreamPtr input, const NamesAndTypesList & table_column_names = NamesAndTypesList());
/// Выполнить один запрос SELECT из цепочки UNION ALL.
void executeSingleQuery(bool should_perform_union_hint = true);

View File

@ -225,33 +225,14 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
: in;
}
BlockInputStreamPtr InterpreterSelectQuery::execute()
{
if (isFirstSelectInsideUnionAll())
{
executeSingleQuery(false);
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
p->executeSingleQuery(false);
const auto & others = p->streams;
streams.insert(streams.end(), others.begin(), others.end());
}
if (streams.empty())
return new NullBlockInputStream;
for (auto & stream : streams)
stream = new MaterializingBlockInputStream(stream);
executeUnion(streams);
}
else
{
executeSingleQuery();
if (streams.empty())
return new NullBlockInputStream;
}
{
(void) executeWithoutUnion();
if (streams.empty())
return new NullBlockInputStream;
executeUnion(streams);
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
@ -276,6 +257,26 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
return streams[0];
}
BlockInputStreams InterpreterSelectQuery::executeWithoutUnion()
{
if (isFirstSelectInsideUnionAll())
{
executeSingleQuery(false);
for (auto p = next_select_in_union_all.get(); p != nullptr; p = p->next_select_in_union_all.get())
{
p->executeSingleQuery(false);
const auto & others = p->streams;
streams.insert(streams.end(), others.begin(), others.end());
}
for (auto & stream : streams)
stream = new MaterializingBlockInputStream(stream);
}
else
executeSingleQuery();
return streams;
}
void InterpreterSelectQuery::executeSingleQuery(bool should_perform_union_hint)
{

View File

@ -92,8 +92,7 @@ BlockInputStreams StorageView::read(
if (outer_select.final && !inner_select.final)
inner_select.final = outer_select.final;
return BlockInputStreams(1,
InterpreterSelectQuery(inner_query_clone, context, column_names).execute());
return InterpreterSelectQuery(inner_query_clone, context, column_names).executeWithoutUnion();
}