Merge pull request #14005 from ClickHouse/ucasFL-new-branch

Merging #12195
This commit is contained in:
alexey-milovidov 2020-08-26 01:33:54 +03:00 committed by GitHub
commit 00c697df06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 108 additions and 16 deletions

View File

@ -396,6 +396,7 @@ class IColumn;
\
M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \
M(Bool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \
M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \
M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0)
#define FORMAT_FACTORY_SETTINGS(M) \

View File

@ -14,6 +14,7 @@
#include <Storages/StorageValues.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{
@ -33,7 +34,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
* but it's clear that here is not the best place for this functionality.
*/
addTableLock(
storage->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
storage->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
/// If the "root" table deduplicates blocks, there are no need to make deduplication for children
/// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks
@ -57,9 +58,9 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
insert_context->setSetting("insert_deduplicate", false);
// Separate min_insert_block_size_rows/min_insert_block_size_bytes for children
if (insert_settings.min_insert_block_size_rows_for_materialized_views.changed)
if (insert_settings.min_insert_block_size_rows_for_materialized_views)
insert_context->setSetting("min_insert_block_size_rows", insert_settings.min_insert_block_size_rows_for_materialized_views.value);
if (insert_settings.min_insert_block_size_bytes_for_materialized_views.changed)
if (insert_settings.min_insert_block_size_bytes_for_materialized_views)
insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value);
}
@ -74,7 +75,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
addTableLock(
materialized_view->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
materialized_view->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout));
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
@ -86,15 +87,17 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, *select_context, SelectQueryOptions().analyze())
.getSampleBlock();
.getSampleBlock();
/// Insert only columns returned by select.
auto list = std::make_shared<ASTExpressionList>();
const auto & inner_table_columns = inner_metadata_snapshot->getColumns();
for (auto & column : header)
for (const auto & column : header)
{
/// But skip columns which storage doesn't have.
if (inner_table_columns.hasPhysical(column.name))
list->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
}
insert->columns = std::move(list);

View File

@ -1,5 +1,6 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Access/AccessFlags.h>
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
@ -14,20 +15,20 @@
#include <IO/ConcatReadBuffer.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Access/AccessFlags.h>
#include <Interpreters/JoinedTables.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/queryToString.h>
#include <Processors/NullSink.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h>
namespace DB
@ -92,7 +93,8 @@ Block InterpreterInsertQuery::getSampleBlock(
/// The table does not have a column with that name
if (!table_sample.has(current_name))
throw Exception("No such column " + current_name + " in table " + query.table_id.getNameForLogs(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
throw Exception("No such column " + current_name + " in table " + query.table_id.getNameForLogs(),
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!allow_materialized && !table_sample_non_materialized.has(current_name))
throw Exception("Cannot insert column " + current_name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
@ -105,6 +107,40 @@ Block InterpreterInsertQuery::getSampleBlock(
}
/** A query that just reads all data without any complex computations or filetering.
* If we just pipe the result to INSERT, we don't have to use too many threads for read.
*/
static bool isTrivialSelect(const ASTSelectQuery & select_query)
{
const auto & tables = select_query.tables();
if (!tables)
return false;
const auto & tables_in_select_query = tables->as<ASTTablesInSelectQuery &>();
if (tables_in_select_query.children.size() != 1)
return false;
const auto & child = tables_in_select_query.children.front();
const auto & table_element = child->as<ASTTablesInSelectQueryElement &>();
const auto & table_expr = table_element.table_expression->as<ASTTableExpression &>();
if (table_expr.subquery)
return false;
/// Note: how to write it in more generic way?
return (!select_query.distinct
&& !select_query.limit_with_ties
&& !select_query.prewhere()
&& !select_query.where()
&& !select_query.groupBy()
&& !select_query.having()
&& !select_query.orderBy()
&& !select_query.limitBy());
};
BlockIO InterpreterInsertQuery::execute()
{
const Settings & settings = context.getSettingsRef();
@ -200,9 +236,47 @@ BlockIO InterpreterInsertQuery::execute()
size_t out_streams_size = 1;
if (query.select)
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
bool is_trivial_insert_select = false;
if (settings.optimize_trivial_insert_select)
{
const auto & selects = query.select->as<ASTSelectWithUnionQuery &>().list_of_selects->children;
is_trivial_insert_select = std::all_of(selects.begin(), selects.end(), [](const ASTPtr & select)
{
return isTrivialSelect(select->as<ASTSelectQuery &>());
});
}
if (is_trivial_insert_select)
{
/** When doing trivial INSERT INTO ... SELECT ... FROM table,
* don't need to process SELECT with more than max_insert_threads
* and it's reasonable to set block size for SELECT to the desired block size for INSERT
* to avoid unnecessary squashing.
*/
Settings new_settings = context.getSettings();
new_settings.max_threads = std::max<UInt64>(1, settings.max_insert_threads);
if (settings.min_insert_block_size_rows)
new_settings.max_block_size = settings.min_insert_block_size_rows;
Context new_context = context;
new_context.setSettings(new_settings);
InterpreterSelectWithUnionQuery interpreter_select{
query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
}
else
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{
query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res = interpreter_select.execute();
}
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());

View File

@ -1,4 +1,8 @@
<test>
<settings>
<optimize_trivial_insert_select>0</optimize_trivial_insert_select>
</settings>
<create_query>CREATE TABLE polygons (polygon Array(Array(Tuple(Float64, Float64)))) ENGINE = Memory</create_query>
<create_query>
INSERT INTO polygons

View File

@ -1,6 +1,7 @@
DROP TABLE IF EXISTS numbers_squashed;
CREATE TABLE numbers_squashed AS system.numbers ENGINE = StripeLog;
SET optimize_trivial_insert_select = 'false';
SET max_block_size = 10000;
SET min_insert_block_size_rows = 1000000;

View File

@ -78,7 +78,7 @@ insert into data_01278 select
from numbers(100000); -- { serverError 241; }
EOL
} | {
execute --max_memory_usage=$TEST_01278_MEMORY "$@"
execute --max_memory_usage=$TEST_01278_MEMORY --optimize_trivial_insert_select='false' "$@"
}
echo 'select count() from out_01278' | execute
}

View File

@ -0,0 +1,2 @@
1000000 0
1000000 1

View File

@ -0,0 +1,7 @@
SET max_insert_threads = 1, max_threads = 100, min_insert_block_size_rows = 1048576, max_block_size = 65536;
CREATE TEMPORARY TABLE t (x UInt64);
-- For trivial INSERT SELECT, max_threads is lowered to max_insert_threads and max_block_size is changed to min_insert_block_size_rows.
INSERT INTO t SELECT * FROM numbers_mt(1000000);
SET max_threads = 1;
-- If data was inserted by more threads, we will probably see data out of order.
SELECT DISTINCT blockSize(), runningDifference(x) FROM t;