Merge pull request #60745 from jrdi/insert-select-deduplication-bug-reproducer

Fix insert-select + insert_deduplication_token bug by setting streams to 1
This commit is contained in:
Nikita Taranov 2024-03-06 21:02:30 +01:00 committed by GitHub
commit f4fd4e6510
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 83 additions and 0 deletions

View File

@ -513,6 +513,16 @@ BlockIO InterpreterInsertQuery::execute()
const bool resize_to_max_insert_threads = !table->isView() && views.empty();
pre_streams_size = resize_to_max_insert_threads ? settings.max_insert_threads
: std::min<size_t>(settings.max_insert_threads, pipeline.getNumStreams());
/// Deduplication when passing insert_deduplication_token breaks if using more than one thread
if (!settings.insert_deduplication_token.toString().empty())
{
LOG_DEBUG(
getLogger("InsertQuery"),
"Insert-select query using insert_deduplication_token, setting streams to 1 to avoid deduplication issues");
pre_streams_size = 1;
}
if (table->supportsParallelInsert())
sink_streams_size = pre_streams_size;
}

View File

@ -0,0 +1,4 @@
3
6
12
18

View File

@ -0,0 +1,69 @@
-- Tags: distributed
DROP TABLE IF EXISTS landing SYNC;
DROP TABLE IF EXISTS landing_dist SYNC;
DROP TABLE IF EXISTS ds SYNC;
CREATE TABLE landing
(
timestamp DateTime64(3),
status String,
id String
)
ENGINE = MergeTree()
ORDER BY timestamp;
CREATE TABLE landing_dist
(
timestamp DateTime64(3),
status String,
id String
)
ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), 'landing', rand());
SYSTEM STOP MERGES landing; -- Stopping merges to force 3 parts
INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1;
INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1;
INSERT INTO landing (status, id, timestamp) SELECT * FROM generateRandom() LIMIT 1;
CREATE TABLE ds
(
timestamp DateTime64(3),
status String,
id String
)
ENGINE = MergeTree()
ORDER BY timestamp
SETTINGS non_replicated_deduplication_window=1000;
INSERT INTO ds SELECT * FROM landing
SETTINGS insert_deduplicate=1, insert_deduplication_token='token1',
max_insert_threads=5;
SELECT count() FROM ds;
INSERT INTO ds SELECT * FROM landing
SETTINGS insert_deduplicate=1, insert_deduplication_token='token2',
max_insert_threads=1;
SELECT count() FROM ds;
-- When reading from distributed table, 6 rows are going to be retrieved
-- due to the being using the two shards cluster
INSERT INTO ds SELECT * FROM landing_dist
SETTINGS insert_deduplicate=1, insert_deduplication_token='token3',
max_insert_threads=5;
SELECT count() FROM ds;
INSERT INTO ds SELECT * FROM landing_dist
SETTINGS insert_deduplicate=1, insert_deduplication_token='token4',
max_insert_threads=1;
SELECT count() FROM ds;
DROP TABLE IF EXISTS landing SYNC;
DROP TABLE IF EXISTS landing_dist SYNC;
DROP TABLE IF EXISTS ds SYNC;