diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f7438b356d5..3ba86add2df 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -396,6 +396,7 @@ class IColumn; \ M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \ M(Bool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \ + M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \ M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0) #define FORMAT_FACTORY_SETTINGS(M) \ diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 6e29b4fede8..12ee22b20bb 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -14,6 +14,7 @@ #include #include + namespace DB { @@ -33,7 +34,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( * but it's clear that here is not the best place for this functionality. */ addTableLock( - storage->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout)); + storage->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout)); /// If the "root" table deduplicates blocks, there are no need to make deduplication for children /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks @@ -57,9 +58,9 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( insert_context->setSetting("insert_deduplicate", false); // Separate min_insert_block_size_rows/min_insert_block_size_bytes for children - if (insert_settings.min_insert_block_size_rows_for_materialized_views.changed) + if (insert_settings.min_insert_block_size_rows_for_materialized_views) insert_context->setSetting("min_insert_block_size_rows", insert_settings.min_insert_block_size_rows_for_materialized_views.value); - if (insert_settings.min_insert_block_size_bytes_for_materialized_views.changed) + if (insert_settings.min_insert_block_size_bytes_for_materialized_views) insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value); } @@ -74,7 +75,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( if (auto * materialized_view = dynamic_cast(dependent_table.get())) { addTableLock( - materialized_view->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout)); + materialized_view->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout)); StoragePtr inner_table = materialized_view->getTargetTable(); auto inner_table_id = inner_table->getStorageID(); @@ -86,15 +87,17 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( /// Get list of columns we get from select query. auto header = InterpreterSelectQuery(query, *select_context, SelectQueryOptions().analyze()) - .getSampleBlock(); + .getSampleBlock(); /// Insert only columns returned by select. auto list = std::make_shared(); const auto & inner_table_columns = inner_metadata_snapshot->getColumns(); - for (auto & column : header) + for (const auto & column : header) + { /// But skip columns which storage doesn't have. if (inner_table_columns.hasPhysical(column.name)) list->children.emplace_back(std::make_shared(column.name)); + } insert->columns = std::move(list); diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 31a623e82fd..1557f065b32 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -14,20 +15,20 @@ #include #include #include -#include #include #include #include #include #include +#include #include +#include +#include +#include +#include #include #include #include -#include -#include -#include -#include namespace DB @@ -92,7 +93,8 @@ Block InterpreterInsertQuery::getSampleBlock( /// The table does not have a column with that name if (!table_sample.has(current_name)) - throw Exception("No such column " + current_name + " in table " + query.table_id.getNameForLogs(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + throw Exception("No such column " + current_name + " in table " + query.table_id.getNameForLogs(), + ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); if (!allow_materialized && !table_sample_non_materialized.has(current_name)) throw Exception("Cannot insert column " + current_name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN); @@ -105,6 +107,40 @@ Block InterpreterInsertQuery::getSampleBlock( } +/** A query that just reads all data without any complex computations or filetering. + * If we just pipe the result to INSERT, we don't have to use too many threads for read. + */ +static bool isTrivialSelect(const ASTSelectQuery & select_query) +{ + const auto & tables = select_query.tables(); + + if (!tables) + return false; + + const auto & tables_in_select_query = tables->as(); + + if (tables_in_select_query.children.size() != 1) + return false; + + const auto & child = tables_in_select_query.children.front(); + const auto & table_element = child->as(); + const auto & table_expr = table_element.table_expression->as(); + + if (table_expr.subquery) + return false; + + /// Note: how to write it in more generic way? + return (!select_query.distinct + && !select_query.limit_with_ties + && !select_query.prewhere() + && !select_query.where() + && !select_query.groupBy() + && !select_query.having() + && !select_query.orderBy() + && !select_query.limitBy()); +}; + + BlockIO InterpreterInsertQuery::execute() { const Settings & settings = context.getSettingsRef(); @@ -200,9 +236,47 @@ BlockIO InterpreterInsertQuery::execute() size_t out_streams_size = 1; if (query.select) { - /// Passing 1 as subquery_depth will disable limiting size of intermediate result. - InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)}; - res = interpreter_select.execute(); + bool is_trivial_insert_select = false; + + if (settings.optimize_trivial_insert_select) + { + const auto & selects = query.select->as().list_of_selects->children; + + is_trivial_insert_select = std::all_of(selects.begin(), selects.end(), [](const ASTPtr & select) + { + return isTrivialSelect(select->as()); + }); + } + + if (is_trivial_insert_select) + { + /** When doing trivial INSERT INTO ... SELECT ... FROM table, + * don't need to process SELECT with more than max_insert_threads + * and it's reasonable to set block size for SELECT to the desired block size for INSERT + * to avoid unnecessary squashing. + */ + + Settings new_settings = context.getSettings(); + + new_settings.max_threads = std::max(1, settings.max_insert_threads); + + if (settings.min_insert_block_size_rows) + new_settings.max_block_size = settings.min_insert_block_size_rows; + + Context new_context = context; + new_context.setSettings(new_settings); + + InterpreterSelectWithUnionQuery interpreter_select{ + query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)}; + res = interpreter_select.execute(); + } + else + { + /// Passing 1 as subquery_depth will disable limiting size of intermediate result. + InterpreterSelectWithUnionQuery interpreter_select{ + query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)}; + res = interpreter_select.execute(); + } if (table->supportsParallelInsert() && settings.max_insert_threads > 1) out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams()); diff --git a/tests/performance/point_in_polygon.xml b/tests/performance/point_in_polygon.xml index a1ef4891577..403c2d62cba 100644 --- a/tests/performance/point_in_polygon.xml +++ b/tests/performance/point_in_polygon.xml @@ -1,4 +1,8 @@ + + 0 + + CREATE TABLE polygons (polygon Array(Array(Tuple(Float64, Float64)))) ENGINE = Memory INSERT INTO polygons diff --git a/tests/queries/0_stateless/00340_squashing_insert_select.sql b/tests/queries/0_stateless/00340_squashing_insert_select.sql index 65b82f4b4f9..102eb061bad 100644 --- a/tests/queries/0_stateless/00340_squashing_insert_select.sql +++ b/tests/queries/0_stateless/00340_squashing_insert_select.sql @@ -1,6 +1,7 @@ DROP TABLE IF EXISTS numbers_squashed; CREATE TABLE numbers_squashed AS system.numbers ENGINE = StripeLog; +SET optimize_trivial_insert_select = 'false'; SET max_block_size = 10000; SET min_insert_block_size_rows = 1000000; diff --git a/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh b/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh index ec450b47432..2dc3d695937 100755 --- a/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh +++ b/tests/queries/0_stateless/01278_min_insert_block_size_rows_for_materialized_views.sh @@ -78,7 +78,7 @@ insert into data_01278 select from numbers(100000); -- { serverError 241; } EOL } | { - execute --max_memory_usage=$TEST_01278_MEMORY "$@" + execute --max_memory_usage=$TEST_01278_MEMORY --optimize_trivial_insert_select='false' "$@" } echo 'select count() from out_01278' | execute } diff --git a/tests/queries/0_stateless/01455_optimize_trivial_insert_select.reference b/tests/queries/0_stateless/01455_optimize_trivial_insert_select.reference new file mode 100644 index 00000000000..ebe978eef95 --- /dev/null +++ b/tests/queries/0_stateless/01455_optimize_trivial_insert_select.reference @@ -0,0 +1,2 @@ +1000000 0 +1000000 1 diff --git a/tests/queries/0_stateless/01455_optimize_trivial_insert_select.sql b/tests/queries/0_stateless/01455_optimize_trivial_insert_select.sql new file mode 100644 index 00000000000..de470fe6a57 --- /dev/null +++ b/tests/queries/0_stateless/01455_optimize_trivial_insert_select.sql @@ -0,0 +1,7 @@ +SET max_insert_threads = 1, max_threads = 100, min_insert_block_size_rows = 1048576, max_block_size = 65536; +CREATE TEMPORARY TABLE t (x UInt64); +-- For trivial INSERT SELECT, max_threads is lowered to max_insert_threads and max_block_size is changed to min_insert_block_size_rows. +INSERT INTO t SELECT * FROM numbers_mt(1000000); +SET max_threads = 1; +-- If data was inserted by more threads, we will probably see data out of order. +SELECT DISTINCT blockSize(), runningDifference(x) FROM t;