Merge pull request #59544 from jrdi/fix-update-insert-deduplication-token

Fix corner case when passing `update_insert_deduplication_token_in_dependent_materialized_views`
This commit is contained in:
Nikolai Kochetov 2024-02-05 13:03:12 +01:00 committed by GitHub
commit 89bcebfe96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 103 additions and 29 deletions

View File

@ -2097,7 +2097,7 @@ SELECT * FROM test_table
## update_insert_deduplication_token_in_dependent_materialized_views {#update-insert-deduplication-token-in-dependent-materialized-views} ## update_insert_deduplication_token_in_dependent_materialized_views {#update-insert-deduplication-token-in-dependent-materialized-views}
Allows to update `insert_deduplication_token` with table identifier during insert in dependent materialized views, if setting `deduplicate_blocks_in_dependent_materialized_views` is enabled and `insert_deduplication_token` is set. Allows to update `insert_deduplication_token` with view identifier during insert in dependent materialized views, if setting `deduplicate_blocks_in_dependent_materialized_views` is enabled and `insert_deduplication_token` is set.
Possible values: Possible values:

View File

@ -247,30 +247,6 @@ Chain buildPushingToViewsChain(
{ {
insert_context->setSetting("insert_deduplicate", Field{false}); insert_context->setSetting("insert_deduplicate", Field{false});
} }
else if (insert_settings.update_insert_deduplication_token_in_dependent_materialized_views &&
!insert_settings.insert_deduplication_token.value.empty())
{
/** Update deduplication token passed to dependent MV with current table id. So it is possible to properly handle
* deduplication in complex INSERT flows.
*
* Example:
*
* landing ---> mv_1_1 ---> ds_1_1 ---> mv_2_1 ---> ds_2_1 ---> mv_3_1 ---> ds_3_1
* | |
* --> mv_1_2 ---> ds_1_2 ---> mv_2_2 --
*
* Here we want to avoid deduplication for two different blocks generated from `mv_2_1` and `mv_2_2` that will
* be inserted into `ds_2_1`.
*/
auto insert_deduplication_token = insert_settings.insert_deduplication_token.value;
if (table_id.hasUUID())
insert_deduplication_token += "_" + toString(table_id.uuid);
else
insert_deduplication_token += "_" + table_id.getFullNameNotQuoted();
insert_context->setSetting("insert_deduplication_token", insert_deduplication_token);
}
// Processing of blocks for MVs is done block by block, and there will // Processing of blocks for MVs is done block by block, and there will
// be no parallel reading after (plus it is not a costless operation) // be no parallel reading after (plus it is not a costless operation)
@ -327,6 +303,46 @@ Chain buildPushingToViewsChain(
auto & target_name = runtime_stats->target_name; auto & target_name = runtime_stats->target_name;
auto * view_counter_ms = &runtime_stats->elapsed_ms; auto * view_counter_ms = &runtime_stats->elapsed_ms;
const auto & insert_settings = insert_context->getSettingsRef();
ContextMutablePtr view_insert_context = insert_context;
if (!disable_deduplication_for_children &&
insert_settings.update_insert_deduplication_token_in_dependent_materialized_views &&
!insert_settings.insert_deduplication_token.value.empty())
{
/** Update deduplication token passed to dependent MV with current view id. So it is possible to properly handle
* deduplication in complex INSERT flows.
*
* Example:
*
* landing ---> mv_1_1 ---> ds_1_1 ---> mv_2_1 ---> ds_2_1 ---> mv_3_1 ---> ds_3_1
* | |
* --> mv_1_2 ---> ds_1_2 ---> mv_2_2 --
*
* Here we want to avoid deduplication for two different blocks generated from `mv_2_1` and `mv_2_2` that will
* be inserted into `ds_2_1`.
*
* We are forced to use view id instead of table id because there are some possible INSERT flows where no tables
* are involved.
*
* Example:
*
* landing ---> mv_1_1 ---> ds_1_1
* | |
* --> mv_1_2 --
*
*/
auto insert_deduplication_token = insert_settings.insert_deduplication_token.value;
if (view_id.hasUUID())
insert_deduplication_token += "_" + toString(view_id.uuid);
else
insert_deduplication_token += "_" + view_id.getFullNameNotQuoted();
view_insert_context = Context::createCopy(insert_context);
view_insert_context->setSetting("insert_deduplication_token", insert_deduplication_token);
}
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get())) if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get()))
{ {
auto lock = materialized_view->tryLockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout); auto lock = materialized_view->tryLockForShare(context->getInitialQueryId(), context->getSettingsRef().lock_acquire_timeout);
@ -394,7 +410,7 @@ Chain buildPushingToViewsChain(
insert_columns.emplace_back(column.name); insert_columns.emplace_back(column.name);
} }
InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false); InterpreterInsertQuery interpreter(nullptr, view_insert_context, false, false, false);
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms); out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);
out.addStorageHolder(view); out.addStorageHolder(view);
out.addStorageHolder(inner_table); out.addStorageHolder(inner_table);
@ -404,7 +420,7 @@ Chain buildPushingToViewsChain(
runtime_stats->type = QueryViewsLogElement::ViewType::LIVE; runtime_stats->type = QueryViewsLogElement::ViewType::LIVE;
query = live_view->getInnerQuery(); // Used only to log in system.query_views_log query = live_view->getInnerQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsChain( out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(), view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ true, /* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert, storage_header); thread_status_holder, running_group, view_counter_ms, async_insert, storage_header);
} }
@ -413,13 +429,13 @@ Chain buildPushingToViewsChain(
runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW; runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW;
query = window_view->getMergeableQuery(); // Used only to log in system.query_views_log query = window_view->getMergeableQuery(); // Used only to log in system.query_views_log
out = buildPushingToViewsChain( out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(), view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ true, /* no_destination= */ true,
thread_status_holder, running_group, view_counter_ms, async_insert); thread_status_holder, running_group, view_counter_ms, async_insert);
} }
else else
out = buildPushingToViewsChain( out = buildPushingToViewsChain(
view, view_metadata_snapshot, insert_context, ASTPtr(), view, view_metadata_snapshot, view_insert_context, ASTPtr(),
/* no_destination= */ false, /* no_destination= */ false,
thread_status_holder, running_group, view_counter_ms, async_insert); thread_status_holder, running_group, view_counter_ms, async_insert);

View File

@ -0,0 +1,5 @@
0
ds_1_1 all_1_1_0 0
ds_1_1 all_2_2_0 0
landing all_1_1_0 0
10

View File

@ -0,0 +1,53 @@
SET insert_deduplicate = 1;
SET deduplicate_blocks_in_dependent_materialized_views = 1;
SET update_insert_deduplication_token_in_dependent_materialized_views = 1;
SET insert_deduplication_token = 'test';
DROP TABLE IF EXISTS landing;
CREATE TABLE landing
(
timestamp UInt64,
value UInt64
)
ENGINE = MergeTree ORDER BY tuple() SETTINGS non_replicated_deduplication_window = 1000;
DROP TABLE IF EXISTS ds_1_1;
CREATE TABLE ds_1_1
(
t UInt64,
v UInt64
)
ENGINE = MergeTree ORDER BY tuple() SETTINGS non_replicated_deduplication_window = 1000;
DROP VIEW IF EXISTS mv_1_1;
CREATE MATERIALIZED VIEW mv_1_1 TO ds_1_1 as
SELECT
timestamp t, sum(value) v
FROM landing
GROUP BY t;
DROP VIEW IF EXISTS mv_1_2;
CREATE MATERIALIZED VIEW mv_1_2 TO ds_1_1 as
SELECT
timestamp t, sum(value) v
FROM landing
GROUP BY t;
INSERT INTO landing SELECT 1 as timestamp, 1 AS value FROM numbers(10);
SELECT sleep(3);
INSERT INTO landing SELECT 1 as timestamp, 1 AS value FROM numbers(10);
SYSTEM FLUSH LOGS;
SELECT table, name, error FROM system.part_log
WHERE database = currentDatabase()
ORDER BY table, name;
SELECT count() FROM landing;
DROP TABLE landing;
DROP TABLE ds_1_1;
DROP VIEW mv_1_1;
DROP VIEW mv_1_2;