optimize trivial insert select

style fix

update

fix

fix

fix
This commit is contained in:
feng lv 2020-07-07 17:24:49 +08:00
parent 8d65deaf85
commit 2cce811dda

View File

@ -1,5 +1,6 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Access/AccessFlags.h>
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
@ -14,20 +15,20 @@
#include <IO/ConcatReadBuffer.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Access/AccessFlags.h>
#include <Interpreters/JoinedTables.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/NullSink.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h>
namespace DB
@ -200,9 +201,60 @@ BlockIO InterpreterInsertQuery::execute()
size_t out_streams_size = 1;
if (query.select)
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
auto is_trivial_select = [](const auto query_)
{
if (query_.tables())
{
const auto & tables_in_select_query = query_.tables()->template as<ASTTablesInSelectQuery &>();
for (const auto & child : tables_in_select_query.children)
{
const auto & table_element = child->template as<ASTTablesInSelectQueryElement &>();
const auto & table_expr = table_element.table_expression->template as<ASTTableExpression &>();
if (table_expr.subquery)
{
return false;
}
}
}
if (!query_.distinct && !query_.limit_with_ties && !query_.prewhere()
&& !query_.where() && !query_.groupBy() && !query_.having() && !query_.orderBy() && !query_.limitBy())
return true;
return false;
};
const auto & ast = query.select->as<ASTSelectWithUnionQuery &>();
size_t num_selects = ast.list_of_selects->children.size();
bool is_trivial_query = true;
for (size_t query_num = 0; query_num < num_selects; ++query_num)
{
const auto & query_select = ast.list_of_selects->children.at(query_num)->as<ASTSelectQuery &>();
if (!is_trivial_select(query_select))
{
is_trivial_query = false;
break;
}
}
if (is_trivial_query)
{
auto new_context = context;
UInt64 max_threads = settings.max_insert_threads;
UInt64 min_insert_block_size_rows = settings.min_insert_block_size_rows;
new_context.setSetting("max_threads", std::max<UInt64>(1, max_threads));
if (min_insert_block_size_rows)
new_context.setSetting("max_block_size", min_insert_block_size_rows);
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{
query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
}
else
{
InterpreterSelectWithUnionQuery interpreter_select{
query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
}
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());