Merge pull request #59699 from jkartseva/disable_async_inserts_for_dependent_mv_dedup

Insert synchronously if dependent MV deduplication is enabled
This commit is contained in:
Julia Kartseva 2024-02-08 19:23:51 -08:00 committed by GitHub
commit 2286184885
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 50 additions and 1 deletions

View File

@ -933,6 +933,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
reason = "asynchronous insert queue is not configured";
else if (insert_query->select)
reason = "insert query has select";
else if (settings.deduplicate_blocks_in_dependent_materialized_views)
reason = "dependent materialized views block deduplication is enabled";
else if (insert_query->hasInlinedData())
async_insert = true;

View File

@ -933,7 +933,7 @@ void TCPHandler::processInsertQuery()
if (auto table = DatabaseCatalog::instance().tryGetTable(insert_query.table_id, query_context))
async_insert_enabled |= table->areAsynchronousInsertsEnabled();
if (insert_queue && async_insert_enabled && !insert_query.select)
if (insert_queue && async_insert_enabled && !insert_query.select && !settings.deduplicate_blocks_in_dependent_materialized_views)
{
auto result = processAsyncInsertQuery(*insert_queue);
if (result.status == AsynchronousInsertQueue::PushResult::OK)

View File

@ -0,0 +1 @@
Values Ok 4 Parsed

View File

@ -0,0 +1,46 @@
-- Tags: no-parallel
SET async_insert = 1;
SET insert_deduplicate = 1;
SET deduplicate_blocks_in_dependent_materialized_views = 1;
DROP TABLE IF EXISTS 02985_test;
CREATE TABLE 02985_test
(
d Date,
value UInt64
) ENGINE = MergeTree ORDER BY tuple() SETTINGS non_replicated_deduplication_window = 1000;
DROP VIEW IF EXISTS 02985_mv;
CREATE MATERIALIZED VIEW 02985_mv
ENGINE = SummingMergeTree ORDER BY d AS
SELECT
d, sum(value) s
FROM 02985_test GROUP BY d;
-- Inserts are synchronous.
INSERT INTO 02985_test (*)
VALUES ('2024-01-01', 1), ('2024-01-01', 2), ('2024-01-02', 1);
SYSTEM FLUSH LOGS;
SELECT format, status, rows, data_kind FROM system.asynchronous_insert_log
WHERE database = currentDatabase() AND table = '02985_test';
SET deduplicate_blocks_in_dependent_materialized_views = 0;
-- Set a large value for async_insert_busy_timeout_max_ms to avoid flushing the entry synchronously.
INSERT INTO 02985_test (*)
SETTINGS
async_insert_busy_timeout_min_ms=200,
async_insert_busy_timeout_max_ms=100000
VALUES ('2024-01-01', 1), ('2024-01-01', 2), ('2024-01-02', 1), ('2024-01-02', 4);
SYSTEM FLUSH LOGS;
SELECT format, status, rows, data_kind
FROM system.asynchronous_insert_log
WHERE database = currentDatabase() AND table = '02985_test';
DROP VIEW IF EXISTS 02985_mv;
DROP TABLE IF EXISTS 02985_test;