Merge branch 'new-branch' of https://github.com/ucasFL/ClickHouse into ucasFL-new-branch

This commit is contained in:
Alexey Milovidov 2020-08-24 16:26:08 +03:00
commit dfe870e38c
7 changed files with 147 additions and 9 deletions

View File

@ -396,6 +396,7 @@ class IColumn;
\
M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \
M(Bool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \
M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \
M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0)
#define FORMAT_FACTORY_SETTINGS(M) \

View File

@ -0,0 +1,15 @@
#pragma once
// .h autogenerated by cmake!
#define USE_RE2_ST 0
#define USE_SSL 0
#define USE_HDFS 0
#define USE_AWS_S3 0
#define USE_BROTLI 0
#define USE_UNWIND 0
#define USE_OPENCL 0
#define USE_CASSANDRA 0
#define USE_SENTRY 0
#define USE_GRPC 0
#define CLICKHOUSE_SPLIT_BINARY 0

View File

@ -0,0 +1,50 @@
#pragma once
// .h autogenerated by cmake!
#define USE_DBMS_TCP_PROTOCOL_VERSION 0
#if USE_DBMS_TCP_PROTOCOL_VERSION
#include "Core/Defines.h"
#ifndef VERSION_REVISION
#define VERSION_REVISION DBMS_TCP_PROTOCOL_VERSION
#endif
#else
/* #undef VERSION_REVISION */
#endif
#define VERSION_NAME "Project"
#define DBMS_NAME VERSION_NAME
/* #undef VERSION_MAJOR */
/* #undef VERSION_MINOR */
/* #undef VERSION_PATCH */
/* #undef VERSION_STRING */
#define VERSION_STRING_SHORT "."
/* #undef VERSION_OFFICIAL */
#define VERSION_FULL "Project "
/* #undef VERSION_DESCRIBE */
/* #undef VERSION_GITHASH */
#define VERSION_INTEGER ERROR
#if defined(VERSION_MAJOR)
#define DBMS_VERSION_MAJOR VERSION_MAJOR
#else
#define DBMS_VERSION_MAJOR 0
#endif
#if defined(VERSION_MINOR)
#define DBMS_VERSION_MINOR VERSION_MINOR
#else
#define DBMS_VERSION_MINOR 0
#endif
#if defined(VERSION_PATCH)
#define DBMS_VERSION_PATCH VERSION_PATCH
#else
#define DBMS_VERSION_PATCH 0
#endif
#if !defined(VERSION_OFFICIAL)
# define VERSION_OFFICIAL ""
#endif

View File

@ -0,0 +1,11 @@
#pragma once
// .h autogenerated by cmake!
#define USE_ICU 0
#define USE_MYSQL 0
#define USE_RDKAFKA 0
#define USE_EMBEDDED_COMPILER 0
#define USE_INTERNAL_LLVM_LIBRARY 0
#define USE_SSL 0
#define USE_OPENCL 0

View File

@ -1,5 +1,6 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Access/AccessFlags.h>
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
@ -14,20 +15,20 @@
#include <IO/ConcatReadBuffer.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Access/AccessFlags.h>
#include <Interpreters/JoinedTables.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/NullSink.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h>
namespace DB
@ -200,9 +201,68 @@ BlockIO InterpreterInsertQuery::execute()
size_t out_streams_size = 1;
if (query.select)
{
auto optimize_trivial_insert_select = settings.optimize_trivial_insert_select;
if (optimize_trivial_insert_select)
{
auto is_trivial_select = [](const auto && query_)
{
if (query_.tables())
{
const auto & tables_in_select_query = query_.tables()->template as<ASTTablesInSelectQuery &>();
for (const auto & child : tables_in_select_query.children)
{
const auto & table_element = child->template as<ASTTablesInSelectQueryElement &>();
const auto & table_expr = table_element.table_expression->template as<ASTTableExpression &>();
if (table_expr.subquery)
{
return false;
}
}
}
return (!query_.distinct && !query_.limit_with_ties && !query_.prewhere() && !query_.where() && !query_.groupBy()
&& !query_.having() && !query_.orderBy() && !query_.limitBy());
};
const auto & ast = query.select->as<ASTSelectWithUnionQuery &>();
size_t num_selects = ast.list_of_selects->children.size();
bool is_trivial_query = true;
for (size_t query_num = 0; query_num < num_selects; ++query_num)
{
const auto & query_select = ast.list_of_selects->children.at(query_num)->as<ASTSelectQuery &>();
if (!is_trivial_select(std::move(query_select)))
{
is_trivial_query = false;
break;
}
}
if (is_trivial_query)
{
auto new_context = context;
UInt64 max_threads = settings.max_insert_threads;
UInt64 min_insert_block_size_rows = settings.min_insert_block_size_rows;
new_context.setSetting("max_threads", std::max<UInt64>(1, max_threads));
if (min_insert_block_size_rows)
new_context.setSetting("max_block_size", min_insert_block_size_rows);
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
InterpreterSelectWithUnionQuery interpreter_select{
query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
}
else
{
InterpreterSelectWithUnionQuery interpreter_select{
query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
}
}
else
{
InterpreterSelectWithUnionQuery interpreter_select{
query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
}
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());

View File

@ -1,6 +1,7 @@
DROP TABLE IF EXISTS numbers_squashed;
CREATE TABLE numbers_squashed AS system.numbers ENGINE = StripeLog;
SET optimize_trivial_insert_select = 'false';
SET max_block_size = 10000;
SET min_insert_block_size_rows = 1000000;

View File

@ -78,7 +78,7 @@ insert into data_01278 select
from numbers(100000); -- { serverError 241; }
EOL
} | {
execute --max_memory_usage=$TEST_01278_MEMORY "$@"
execute --max_memory_usage=$TEST_01278_MEMORY --optimize_trivial_insert_select='false' "$@"
}
echo 'select count() from out_01278' | execute
}